libzypp  17.31.31
provide.cc
Go to the documentation of this file.
1 #include "private/provide_p.h"
2 #include "private/providedbg_p.h"
5 #include <zypp-core/zyppng/io/IODevice>
6 #include <zypp-core/Url.h>
7 #include <zypp-core/base/DtorReset>
8 #include <zypp-core/fs/PathInfo.h>
9 #include <zypp-media/MediaException>
10 #include <zypp-media/FileCheckException>
11 #include <zypp-media/CDTools>
12 
13 // required to generate uuids
14 #include <glib.h>
15 
16 
17 L_ENV_CONSTR_DEFINE_FUNC(ZYPP_MEDIA_PROVIDER_DEBUG)
18 
19 namespace zyppng {
20 
22  : BasePrivate(pub)
23  , _workDir( std::move(workDir) )
24  , _workerPath( constants::DEFAULT_PROVIDE_WORKER_PATH.data() )
25  {
26  if ( _workDir.empty() ) {
28  } else {
30  }
31 
32  MIL << "Provider workdir is: " << _workDir << std::endl;
33 
34  _scheduleTrigger->setSingleShot(true);
35  Base::connect( *_scheduleTrigger, &Timer::sigExpired, *this, &ProvidePrivate::doSchedule );
36  }
37 
39  {
40  if ( provideDebugEnabled () ) {
41  std::string_view reasonStr;
42  switch( reason ) {
43  case ProvideStart:
44  reasonStr = "ProvideStart";
45  break;
46  case QueueIdle:
47  reasonStr = "QueueIdle";
48  break;
49  case EnqueueItem:
50  reasonStr = "EnqueueItem";
51  break;
52  case EnqueueReq:
53  reasonStr = "EnqueueReq";
54  break;
55  case FinishReq:
56  reasonStr = "FinishReq";
57  break;
58  case RestartAttach:
59  reasonStr = "RestartAttach";
60  break;
61  }
62  DBG << "Triggering the schedule timer (" << reasonStr << ")" << std::endl;
63  }
64 
65  // we use a single shot timer that instantly times out when the event loop is entered the next time
66  // this way we compress many schedule requests that happen during a eventloop run into one
67  _scheduleTrigger->start(0);
68  }
69 
70  void ProvidePrivate::doSchedule ( zyppng::Timer & )
71  {
72  if ( !_isRunning )
73  return;
74 
75  if ( _isScheduling ) {
76  DBG_PRV << "Scheduling triggered during scheduling, returning immediately." << std::endl;
77  return;
78  }
79 
80  const int cpuLimit =
81 #ifdef _SC_NPROCESSORS_ONLN
82  sysconf(_SC_NPROCESSORS_ONLN) * 2;
83 #else
85 #endif
86 
87  // helper lambda to find the worker that is idle for the longest time
88  constexpr auto findLaziestWorker = []( const auto &workerQueues, const auto &idleNames ) {
89  auto candidate = workerQueues.end();
90  ProvideQueue::TimePoint candidateIdleSince = ProvideQueue::TimePoint::max();
91 
92  //find the worker thats idle the longest
93  for ( const auto &name : idleNames ) {
94  auto thisElem = workerQueues.find(name);
95  if ( thisElem == workerQueues.end() )
96  continue;
97 
98  const auto idleS = thisElem->second->idleSince();
99  if ( idleS
100  && ( candidate == workerQueues.end() || *idleS < candidateIdleSince ) ) {
101  candidateIdleSince = *idleS;
102  candidate = thisElem;
103  }
104  }
105 
106  if ( candidate != workerQueues.end() )
107  MIL_PRV << "Found idle worker:" << candidate->first << " idle since: " << candidateIdleSince.time_since_epoch().count() << std::endl;
108 
109  return candidate;
110  };
111 
112  // clean up old media
113 
114  for ( auto iMedia = _attachedMediaInfos.begin(); iMedia != _attachedMediaInfos.end(); ) {
115  if ( iMedia->_refCount > 0 ) {
116  MIL_PRV << "Not releasing media " << iMedia->_name << " refcount is not zero" << std::endl;
117  ++iMedia;
118  continue;
119  }
120  if ( iMedia->_workerType == ProvideQueue::Config::Downloading ) {
121  // we keep the information around for an hour so we do not constantly download the media files for no reasonDD
122  if ( std::chrono::steady_clock::now() - iMedia->_idleSince >= std::chrono::hours(1) ) {
123  MIL << "Detaching medium " << iMedia->_name << " for baseUrl " << iMedia->_attachedUrl << std::endl;
124  iMedia = _attachedMediaInfos.erase(iMedia);
125  continue;
126  } else {
127  MIL_PRV << "Not releasing media " << iMedia->_name << " downloading worker and not timed out yet." << std::endl;
128  }
129  } else {
130  // mounting handlers, we need to send a request to the workers
131  auto bQueue = iMedia->_backingQueue.lock();
132  if ( bQueue ) {
133  zypp::Url url = iMedia->_attachedUrl;
134  url.setScheme( url.getScheme() + std::string( constants::ATTACHED_MEDIA_SUFFIX) );
135  url.setAuthority( iMedia->_name );
136  const auto &req = ProvideRequest::createDetach( url );
137  if ( req ) {
138  MIL << "Detaching medium " << iMedia->_name << " for baseUrl " << iMedia->_attachedUrl << std::endl;
139  bQueue->enqueue ( *req );
140  iMedia = _attachedMediaInfos.erase(iMedia);
141  continue;
142  } else {
143  ERR << "Could not send detach request, creating the request failed" << std::endl;
144  }
145  } else {
146  ERR << "Could not send detach request since no backing queue was defined" << std::endl;
147  }
148  }
149  ++iMedia;
150  }
151 
152  zypp::DtorReset schedFlag( _isScheduling, false );
153  _isScheduling = true;
154 
155  const auto schedStart = std::chrono::steady_clock::now();
156  MIL_PRV << "Start scheduling" << std::endl;
157 
158  zypp::OnScopeExit deferExitMessage( [&](){
159  const auto dur = std::chrono::steady_clock::now() - schedStart;
160  MIL_PRV << "Exit scheduling after:" << std::chrono::duration_cast<std::chrono::milliseconds>( dur ).count () << std::endl;
161  });
162 
163  // bump inactive items
164  for ( auto it = _items.begin (); it != _items.end(); ) {
165  // was maybe released during scheduling
166  if ( !(*it) )
167  it = _items.erase(it);
168  else {
169  auto &item = *it;
170  if ( item->state() == ProvideItem::Uninitialized ) {
171  item->initialize();
172  }
173  it++;
174  }
175  }
176 
177  // we are scheduling now, everything that triggered the timer until now we can forget about
178  _scheduleTrigger->stop();
179 
180  for( auto queueIter = _queues.begin(); queueIter != _queues.end(); queueIter ++ ) {
181 
182  const auto &scheme = queueIter->_schemeName;
183  auto &queue = queueIter->_requests;
184 
185  if ( !queue.size() )
186  continue;
187 
188  const auto &configOpt = schemeConfig ( scheme );
189 
190  MIL_PRV << "Start scheduling for scheme:" << scheme << " queue size is: " << queue.size() << std::endl;
191 
192  if ( !configOpt ) {
193  // FAIL all requests in this queue
194  ERR << "Scheme: " << scheme << " failed to return a valid configuration." << std::endl;
195 
196  while( queue.size() ) {
197  auto item = std::move( queue.front() );
198  queue.pop_front();
199  if ( item->owner() )
200  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("Failed to query scheme config.")) );
201  }
202 
203  continue;
204  }
205 
206  // the scheme config that defines how we schedule requests on this set of queues
207  const auto &config = configOpt.get();
208  const auto isSingleInstance = ( (config.cfg_flags() & ProvideQueue::Config::SingleInstance) == ProvideQueue::Config::SingleInstance );
209  if ( config.worker_type() == ProvideQueue::Config::Downloading && !isSingleInstance ) {
210 
211  for( auto i = queue.begin (); i != queue.end(); ) {
212 
213  // this is the only place where we remove elements from the queue when the scheduling flag is active
214  // other code just nulls out requests in the queue if during scheduling items need to be removed
215  while ( i != queue.end() && !(*i) ) {
216  i = queue.erase(i);
217  }
218 
219  if ( i == queue.end() )
220  break;
221 
222  ProvideRequestRef item = *i;
223 
224  // Downloading queues do not support attaching via a AttachRequest, this is handled by simply providing the media verification files
225  // If we hit this code path, its a bug
226  if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
227  i = queue.erase(i);
228  if ( item->owner() )
229  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Downloading Queues do not support ProvideMessage::Code::Attach requests") ) );
230  continue;
231  }
232 
233  MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
234 
235  // how many workers for this type do already exist
236  int existingTypeWorkers = 0;
237 
238  // how many currently active connections are there
239  int existingConnections = 0;
240 
241  // all currently available possible queues for the request
242  std::vector< std::pair<zypp::Url, ProvideQueue*> > possibleHostWorkers;
243 
244  // currently idle workers
245  std::vector<std::string> idleWorkers;
246 
247  // all mirrors without a existing worker
248  std::vector<zypp::Url> mirrsWithoutWorker;
249  for ( const auto &url : item->urls() ) {
250 
251  if ( effectiveScheme( url.getScheme() ) != scheme ) {
252  MIL << "Mirror URL " << url << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
253  continue;
254  }
255 
256  if( item->owner()->canRedirectTo( item, url ) )
257  mirrsWithoutWorker.push_back( url );
258  else {
259  MIL_PRV << "URL was rejected" << url << std::endl;
260  }
261  }
262 
263  // at this point the list contains all useable mirrors, if this list is empty the request needs to fail
264  if( mirrsWithoutWorker.size() == 0 ) {
265  MIL << "Request has NO usable URLs" << std::endl;
266  if ( item->owner() )
267  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
268  i = queue.erase(i);
269  continue;
270  }
271 
272 
273  for ( auto &[ queueName, workerQueue ] : _workerQueues ) {
274  if ( ProvideQueue::Config::Downloading != workerQueue->workerConfig().worker_type() )
275  continue;
276 
277  existingTypeWorkers ++;
278  existingConnections += workerQueue->activeRequests();
279 
280  if ( workerQueue->isIdle() )
281  idleWorkers.push_back (queueName);
282 
283  if ( !zypp::str::startsWith( queueName, scheme ) )
284  continue;
285 
286  for ( auto i = mirrsWithoutWorker.begin (); i != mirrsWithoutWorker.end(); ) {
287  const auto &u = *i;
288  if ( u.getHost() == workerQueue->hostname() ) {
289  if ( workerQueue->requestCount() < constants::DEFAULT_ACTIVE_CONN_PER_HOST )
290  possibleHostWorkers.push_back( {u, workerQueue.get()} );
291  i = mirrsWithoutWorker.erase( i );
292  // we can not stop after removing the first hit, since there could be multiple mirrors with the same hostname
293  } else {
294  ++i;
295  }
296  }
297  }
298 
299  if( provideDebugEnabled() ) {
300  MIL << "Current stats: " << std::endl;
301  MIL << "Existing type workers: " << existingTypeWorkers << std::endl;
302  MIL << "Existing active connections: " << existingConnections << std::endl;
303  MIL << "Possible host workers: "<< possibleHostWorkers.size() << std::endl;
304  MIL << "Mirrors without worker: " << mirrsWithoutWorker.size() << std::endl;
305  }
306 
307  // need to wait for requests to finish in order to schedule more requests
308  if ( existingConnections >= constants::DEFAULT_ACTIVE_CONN ) {
309  MIL_PRV << "Reached maximum nr of connections, break" << std::endl;
310  break;
311  }
312 
313  // if no workers are running, take the first mirror and start a worker for it
314  // if < nr of workers are running, use a mirror we do not have a conn yet to
315  if ( existingTypeWorkers < constants::DEFAULT_MAX_DYNAMIC_WORKERS
316  && mirrsWithoutWorker.size() ) {
317 
318  MIL_PRV << "Free worker slots and available mirror URLs, starting a new worker" << std::endl;
319 
320  //@TODO out of the available mirrors use the best one based on statistics ( if available )
321  bool found = false;
322  for( const auto &url : mirrsWithoutWorker ) {
323 
324  // mark this URL as used now, in case the queue can not be started we won't try it anymore
325  if ( !item->owner()->safeRedirectTo ( item, url ) )
326  continue;
327 
328  ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
329  if ( !q->startup( scheme, _workDir / scheme / url.getHost(), url.getHost() ) ) {
330  break;
331  } else {
332 
333  MIL_PRV << "Started worker for " << url.getHost() << " enqueing request" << std::endl;
334 
335  item->setActiveUrl(url);
336  found = true;
337 
338  std::string str = zypp::str::Format("%1%://%2%") % scheme % url.getHost();
339  _workerQueues[str] = q;
340  q->enqueue( item );
341  break;
342  }
343  }
344 
345  if( found ) {
346  i = queue.erase(i);
347  continue;
348  }
349  }
350 
351  // if we cannot start a new worker, find the best queue where we can push the item into
352  if ( possibleHostWorkers.size() ) {
353 
354  MIL_PRV << "No free worker slots, looking for the best existing worker" << std::endl;
355  bool found = false;
356  while( possibleHostWorkers.size () ) {
357  std::vector< std::pair<zypp::Url, ProvideQueue *> >::iterator candidate = possibleHostWorkers.begin();
358  for ( auto i = candidate+1; i != possibleHostWorkers.end(); i++ ) {
359  if ( i->second->activeRequests () < candidate->second->activeRequests () )
360  candidate = i;
361  }
362 
363  if ( !item->owner()->safeRedirectTo( item, candidate->first ) ) {
364  possibleHostWorkers.erase( candidate );
365  continue;
366  }
367 
368  MIL_PRV << "Using existing worker " << candidate->first.getHost() << " to download request" << std::endl;
369 
370  found = true;
371  item->setActiveUrl( candidate->first );
372  candidate->second->enqueue( item );
373  break;
374  }
375 
376  if( found ) {
377  i = queue.erase(i);
378  continue;
379  }
380  }
381 
382  // if we reach this place all we can now try is to decomission idle queues and use the new slot to start
383  // a new worker
384  if ( idleWorkers.size() && mirrsWithoutWorker.size() ) {
385 
386  MIL_PRV << "No free worker slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
387 
388  auto candidate = findLaziestWorker( _workerQueues, idleWorkers );
389  if ( candidate != _workerQueues.end() ) {
390 
391  // for now we decomission the worker and start a new one, should we instead introduce a "reset" message
392  // that repurposes the worker to another hostname/workdir config?
393  _workerQueues.erase(candidate);
394 
395  //@TODO out of the available mirrors use the best one based on statistics ( if available )
396  bool found = false;
397  for( const auto &url : mirrsWithoutWorker ) {
398 
399  if ( !item->owner()->safeRedirectTo ( item, url ) )
400  continue;
401 
402  ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
403  if ( !q->startup( scheme, _workDir / scheme / url.getHost(), url.getHost() ) ) {
404  break;
405  } else {
406 
407  MIL_PRV << "Replaced worker for " << url.getHost() << ", enqueing request" << std::endl;
408 
409  item->setActiveUrl(url);
410  found = true;
411 
412  auto str = zypp::str::Format("%1%://%2%") % scheme % url.getHost();
413  _workerQueues[str] = q;
414  q->enqueue( item );
415  }
416  }
417 
418  if( found ) {
419  i = queue.erase(i);
420  continue;
421  }
422  }
423  }
424 
425  // if we reach here we skip over the item and try to schedule it again later
426  MIL_PRV << "End of line, deferring request for next try." << std::endl;
427  i++;
428 
429  }
430  } else if ( config.worker_type() == ProvideQueue::Config::CPUBound && !isSingleInstance ) {
431 
432  for( auto i = queue.begin (); i != queue.end(); ) {
433 
434  // this is the only place where we remove elements from the queue when the scheduling flag is active
435  // other code just nulls out requests in the queue if during scheduling items need to be removed
436  while ( i != queue.end() && !(*i) ) {
437  i = queue.erase(i);
438  }
439 
440  if ( i == queue.end() )
441  break;
442 
443  // make a real reference so it does not dissapear when we remove it from the queue
444  ProvideRequestRef item = *i;
445 
446  // CPU bound queues do not support attaching via a AttachRequest, this is handled by simply providing the media verification files
447  // If we hit this code path, its a bug
448  if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
449  i = queue.erase(i);
450  if ( item->owner () )
451  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("CPU bound Queues do not support ProvideAttachSpecRef requests") ) );
452  continue;
453  }
454 
455  MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
456 
457  // how many workers for this type do already exist
458  int existingTypeWorkers = 0;
459  int existingSchemeWorkers = 0;
460 
461  // all currently available possible queues for the request
462  std::vector< ProvideQueue* > possibleWorkers;
463 
464  // currently idle workers
465  std::vector<std::string> idleWorkers;
466 
467  // the URL we are going to use this time
468  zypp::Url url;
469 
470  //CPU bound queues do not spawn per mirrors, we use the first compatible URL
471  for ( const auto &tmpurl : item->urls() ) {
472  if ( effectiveScheme( tmpurl.getScheme() ) != scheme ) {
473  MIL << "Mirror URL " << tmpurl << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
474  continue;
475  }
476  url = tmpurl;
477  break;
478  }
479 
480  // at this point if the URL is empty the request needs to fail
481  if( !url.isValid() ) {
482  MIL << "Request has NO usable URLs" << std::endl;
483  if ( item->owner() )
484  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
485  i = queue.erase(i);
486  continue;
487  }
488 
489  for ( auto &[ queueName, workerQueue ] : _workerQueues ) {
490 
491  if ( ProvideQueue::Config::CPUBound != workerQueue->workerConfig().worker_type() )
492  continue;
493 
494  const bool thisScheme = zypp::str::startsWith( queueName, scheme );
495 
496  existingTypeWorkers ++;
497  if ( thisScheme ) {
498  existingSchemeWorkers++;
499  if ( workerQueue->canScheduleMore() )
500  possibleWorkers.push_back(workerQueue.get());
501  }
502 
503  if ( workerQueue->isIdle() )
504  idleWorkers.push_back(queueName);
505  }
506 
507  if( provideDebugEnabled() ) {
508  MIL << "Current stats: " << std::endl;
509  MIL << "Existing type workers: " << existingTypeWorkers << std::endl;
510  MIL << "Possible CPU workers: "<< possibleWorkers.size() << std::endl;
511  }
512 
513  // first we use existing idle workers of the current type
514  if ( possibleWorkers.size() ) {
515  bool found = false;
516  for ( auto &w : possibleWorkers ) {
517  if ( w->isIdle() ) {
518  MIL_PRV << "Using existing idle worker to provide request" << std::endl;
519  // this is not really required because we are not doing redirect checks
520  item->owner()->redirectTo ( item, url );
521  item->setActiveUrl( url );
522  w->enqueue( item );
523  i = queue.erase(i);
524  found = true;
525  break;
526  }
527  }
528  if ( found )
529  continue;
530  }
531 
532  // we first start as many workers as we need before queueing more request to existing ones
533  if ( existingTypeWorkers < cpuLimit ) {
534 
535  MIL_PRV << "Free CPU slots, starting a new worker" << std::endl;
536 
537  // this is not really required because we are not doing redirect checks
538  item->owner()->redirectTo ( item, url );
539 
540  ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
541  if ( q->startup( scheme, _workDir / scheme ) ) {
542 
543  item->setActiveUrl(url);
544 
545  auto str = zypp::str::Format("%1%#%2%") % scheme % existingSchemeWorkers;
546  _workerQueues[str] = q;
547  q->enqueue( item );
548  i = queue.erase(i);
549  continue;
550  } else {
551  // CPU bound requests can not recover from this error
552  i = queue.erase(i);
553  if ( item->owner() )
554  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
555  continue;
556  }
557  }
558 
559  // we can not start more workers, all we can do now is fill up queues of existing ones
560  if ( possibleWorkers.size() ) {
561  MIL_PRV << "No free CPU slots, looking for the best existing worker" << std::endl;
562 
563  if( possibleWorkers.size () ) {
564  std::vector<ProvideQueue *>::iterator candidate = possibleWorkers.begin();
565  for ( auto i = candidate+1; i != possibleWorkers.end(); i++ ) {
566  if ( (*i)->activeRequests () < (*candidate)->activeRequests () )
567  candidate = i;
568  }
569 
570  // this is not really required because we are not doing redirect checks
571  item->owner()->redirectTo ( item, url );
572 
573  MIL_PRV << "Using existing worker to provide request" << std::endl;
574  item->setActiveUrl( url );
575  (*candidate)->enqueue( item );
576  i = queue.erase(i);
577  continue;
578  }
579  }
580 
581  // if we reach this place all we can now try is to decomission idle queues and use the new slot to start
582  // a new worker
583  if ( idleWorkers.size() ) {
584 
585  MIL_PRV << "No free CPU slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
586 
587  auto candidate = findLaziestWorker( _workerQueues, idleWorkers );
588  if ( candidate != _workerQueues.end() ) {
589 
590  _workerQueues.erase(candidate);
591 
592  // this is not really required because we are not doing redirect checks
593  item->owner()->redirectTo ( item, url );
594 
595  ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
596  if ( q->startup( scheme, _workDir / scheme ) ) {
597 
598  MIL_PRV << "Replaced worker, enqueing request" << std::endl;
599 
600  item->setActiveUrl(url);
601 
602  auto str = zypp::str::Format("%1%#%2%") % scheme % ( existingSchemeWorkers + 1 );
603  _workerQueues[str] = q;
604  q->enqueue( item );
605  i = queue.erase(i);
606  continue;
607  } else {
608  // CPU bound requests can not recover from this error
609  i = queue.erase(i);
610  if ( item->owner() )
611  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
612  continue;
613  }
614  }
615  } else {
616  MIL_PRV << "No idle workers and no free CPU spots, wait for the next schedule run" << std::endl;
617  break;
618  }
619 
620  // if we reach here we skip over the item and try to schedule it again later
621  MIL_PRV << "End of line, deferring request for next try." << std::endl;
622  i++;
623  }
624 
625  } else {
626  // either SingleInstance worker or Mounting/VolatileMounting
627 
628  for( auto i = queue.begin (); i != queue.end(); ) {
629 
630  // this is the only place where we remove elements from the queue when the scheduling flag is active
631  // other code just nulls out requests in the queue if during scheduling items need to be removed
632  while ( i != queue.end() && !(*i) ) {
633  i = queue.erase(i);
634  }
635 
636  if ( i == queue.end() )
637  break;
638 
639  // make a real reference so it does not dissapear when we remove it from the queue
640  ProvideRequestRef item = *i;
641  MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
642 
643  zypp::Url url;
644 
645  //mounting queues do not spawn per mirrors, we use the first compatible URL
646  for ( const auto &tmpurl : item->urls() ) {
647  if ( effectiveScheme( tmpurl.getScheme() ) != scheme ) {
648  MIL << "Mirror URL " << tmpurl << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
649  continue;
650  }
651  url = tmpurl;
652  break;
653  }
654 
655  // at this point if the URL is empty the request needs to fail
656  if( !url.isValid() ) {
657  MIL << "Request has NO usable URLs" << std::endl;
658  if ( item->owner() )
659  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
660  i = queue.erase(i);
661  continue;
662  }
663 
664 
665  ProvideQueue *qToUse = nullptr;
666  if ( !_workerQueues.count(scheme) ) {
667  ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
668  if ( !q->startup( scheme, _workDir / scheme ) ) {
669  ERR << "Worker startup failed!" << std::endl;
670  // mounting/single instance requests can not recover from this error
671  i = queue.erase(i);
672 
673  if ( item->owner() )
674  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
675  continue;
676  }
677 
678  MIL_PRV << "Started worker, enqueing request" << std::endl;
679  qToUse = q.get();
680  _workerQueues[scheme] = q;
681  } else {
682  MIL_PRV << "Found worker, enqueing request" << std::endl;
683  qToUse = _workerQueues.at(scheme).get();
684  }
685 
686  // this is not really required because we are not doing redirect checks
687  item->owner()->redirectTo ( item, url );
688 
689  item->setActiveUrl(url);
690  qToUse->enqueue( item );
691  i = queue.erase(i);
692  }
693  }
694  }
695  }
696 
697  std::list<ProvideItemRef> &ProvidePrivate::items()
698  {
699  return _items;
700  }
701 
703  {
704  return _credManagerOptions;
705  }
706 
707  std::vector<AttachedMediaInfo> &ProvidePrivate::attachedMediaInfos()
708  {
709  return _attachedMediaInfos;
710  }
711 
712  expected<ProvideQueue::Config> ProvidePrivate::schemeConfig( const std::string &scheme )
713  {
714  if ( auto i = _schemeConfigs.find( scheme ); i != _schemeConfigs.end() ) {
715  return expected<ProvideQueue::Config>::success(i->second);
716  } else {
717  // we do not have the queue config yet, we need to start a worker to get one
718  ProvideQueue q( *this );
719  if ( !q.startup( scheme, _workDir / scheme ) ) {
720  return expected<ProvideQueue::Config>::error(ZYPP_EXCPT_PTR(zypp::media::MediaException("Failed to start worker to read scheme config.")));
721  }
722  auto newItem = _schemeConfigs.insert( std::make_pair( scheme, q.workerConfig() ));
723  return expected<ProvideQueue::Config>::success(newItem.first->second);
724  }
725  }
726 
727  std::optional<zypp::ManagedFile> ProvidePrivate::addToFileCache( const zypp::filesystem::Pathname &downloadedFile )
728  {
729  const auto &key = downloadedFile.asString();
730 
731  if ( !zypp::PathInfo(downloadedFile).isExist() ) {
732  _fileCache.erase ( key );
733  return {};
734  }
735 
736  auto i = _fileCache.insert( { key, FileCacheItem() } );
737  if ( !i.second ) {
738  // file did already exist in the cache, return the shared data
739  i.first->second._deathTimer.reset();
740  return i.first->second._file;
741  }
742 
743  i.first->second._file = zypp::ManagedFile( downloadedFile, zypp::filesystem::unlink );
744  return i.first->second._file;
745  }
746 
747  bool ProvidePrivate::isInCache ( const zypp::Pathname &downloadedFile ) const
748  {
749  const auto &key = downloadedFile.asString();
750  return (_fileCache.count(key) > 0);
751  }
752 
753  void ProvidePrivate::queueItem ( ProvideItemRef item )
754  {
755  _items.push_back( item );
757  }
758 
760  {
761  auto elem = std::find_if( _items.begin(), _items.end(), [item]( const auto &i){ return i.get() == item; } );
762  if ( elem != _items.end() ) {
763  if ( _isScheduling ) {
764  (*elem).reset();
765  } else {
766  _items.erase(elem);
767  }
768  }
769  }
770 
771  std::string ProvidePrivate::nextMediaId() const
772  {
773  zypp::AutoDispose rawStr( g_uuid_string_random (), g_free );
774  return zypp::str::asString ( rawStr.value() );
775  }
776 
777  AttachedMediaInfo &ProvidePrivate::addMedium(ProvideQueue::Config::WorkerType workerType, const zypp::Url &baseUrl, ProvideMediaSpec &spec )
778  {
779  auto str = nextMediaId();
780  MIL_PRV << "Generated new ID for media attachment: " << str << std::endl;
781  _attachedMediaInfos.push_back( AttachedMediaInfo{ std::move(str), {}, workerType, baseUrl, spec } );
782  return _attachedMediaInfos.back();
783  }
784 
785  AttachedMediaInfo &ProvidePrivate::addMedium(zypp::proto::Capabilities::WorkerType workerType, ProvideQueueWeakRef backingQueue, const std::string &id, const zypp::Url &baseUrl, ProvideMediaSpec &spec)
786  {
787  MIL_PRV << "New media attachment with id: " << id << std::endl;
788  _attachedMediaInfos.push_back( AttachedMediaInfo{ id, backingQueue, workerType, baseUrl, spec } );
789  return _attachedMediaInfos.back();
790  }
791 
792  bool ProvidePrivate::queueRequest ( ProvideRequestRef req )
793  {
794  const auto &schemeName = effectiveScheme( req->url().getScheme() );
795  auto existingQ = std::find_if( _queues.begin (), _queues.end(), [&schemeName]( const auto &qItem) {
796  return (qItem._schemeName == schemeName);
797  });
798  if ( existingQ != _queues.end() ) {
799  existingQ->_requests.push_back(req);
800  } else {
801  _queues.push_back( ProvidePrivate::QueueItem{ schemeName, {req} } );
802  }
803 
805  return true;
806  }
807 
808  bool ProvidePrivate::dequeueRequest(ProvideRequestRef req , std::exception_ptr error)
809  {
810  auto queue = req->currentQueue ();
811  if ( queue ) {
812  queue->cancel( req.get(), error );
813  return true;
814  } else {
815  // Request not started yet, search request queues
816  for ( auto &q : _queues ) {
817  auto elem = std::find( q._requests.begin(), q._requests.end(), req );
818  if ( elem != q._requests.end() ) {
819  q._requests.erase(elem);
820 
821  if ( req->owner() )
822  req->owner()->finishReq( nullptr, req, error );
823  return true;
824  }
825  }
826  }
827  return false;
828  }
829 
831  {
832  return _workerPath;
833  }
834 
835  const std::string ProvidePrivate::queueName( ProvideQueue &q ) const
836  {
837  for ( const auto &v : _workerQueues ) {
838  if ( v.second.get() == &q )
839  return v.first;
840  }
841  return {};
842  }
843 
845  {
846  return _isRunning;
847  }
848 
849  std::string ProvidePrivate::effectiveScheme(const std::string &scheme) const
850  {
851  const std::string &ss = zypp::str::stripSuffix( scheme, constants::ATTACHED_MEDIA_SUFFIX );
852  if ( auto it = _workerAlias.find ( ss ); it != _workerAlias.end () ) {
853  return it->second;
854  }
855  return ss;
856  }
857 
859  {
860  DBG_PRV << "Pulse timeout" << std::endl;
861 
862  auto now = std::chrono::steady_clock::now();
863 
864  if ( _log ) _log->pulse();
865 
866  // release old cache files
867  for ( auto i = _fileCache.begin (); i != _fileCache.end(); ) {
868  auto &cacheItem = i->second;
869  if ( cacheItem._file.unique() ) {
870  if ( cacheItem._deathTimer ) {
871  if ( now - *cacheItem._deathTimer < std::chrono::seconds(20) ) {
872  MIL << "Releasing file " << *i->second._file << " from cache, death timeout." << std::endl;
873  i = _fileCache.erase(i);
874  continue;
875  }
876  } else {
877  // start the death timeout
878  cacheItem._deathTimer = std::chrono::steady_clock::now();
879  }
880  }
881 
882  ++i;
883  }
884  }
885 
887  {
888  if ( !_items.empty() )
889  return;
890  for ( auto &[k,q] : _workerQueues ) {
891  if ( !q->empty() )
892  return;
893  }
894 
895  // all queues are empty
896  _sigIdle.emit();
897  }
898 
900  {
901  if ( item.state() == ProvideItem::Finished ) {
902  auto itemRef = item.shared_this<ProvideItem>();
903  auto i = std::find( _items.begin(), _items.end(), itemRef );
904  if ( i == _items.end() ) {
905  ERR << "State of unknown Item changed, ignoring" << std::endl;
906  return;
907  }
908  if ( _isScheduling )
909  i->reset();
910  else
911  _items.erase(i);
912  }
913  if ( _items.empty() )
914  onQueueIdle();
915  }
916 
918  {
919  //@TODO is it required to handle overflow?
920  return ++_nextRequestId;
921  }
922 
924  {
925  Data( Provide &parent, const std::string &hdl )
926  : _parent( parent.weak_this<Provide>() )
927  , _hdlName(hdl) { }
928 
929  ~Data() {
930  auto p = _parent.lock(); if (p) p->releaseMedia(_hdlName);
931  }
932 
933  ProvideWeakRef _parent;
934  std::string _hdlName;
935  };
936 
937  ProvideMediaHandle::ProvideMediaHandle( Provide &parent, const std::string &hdl )
938  : _ref( std::make_shared<ProvideMediaHandle::Data>( parent, hdl ) )
939  {}
940 
941  std::shared_ptr<Provide> ProvideMediaHandle::parent() const
942  {
943  return _ref->_parent.lock();
944  }
945 
947  {
948  return ( _ref.get() != nullptr );
949  }
950 
951  std::string ProvideMediaHandle::handle() const
952  {
953  return _ref->_hdlName;
954  }
955 
956 
957  Provide::Provide( const zypp::Pathname &workDir ) : Base( *new ProvidePrivate( zypp::Pathname(workDir), *this ) )
958  {
959  Z_D();
960  connect( *d->_pulseTimer, &Timer::sigExpired, *d, &ProvidePrivate::onPulseTimeout );
961  }
962 
963  ProvideRef Provide::create( const zypp::filesystem::Pathname &workDir )
964  {
965  return ProvideRef( new Provide(workDir) );
966  }
967 
968  AsyncOpRef<expected<Provide::MediaHandle>> Provide::attachMedia( const zypp::Url &url, const ProvideMediaSpec &request )
969  {
970  return attachMedia ( std::vector<zypp::Url>{url}, request );
971  }
972 
973  AsyncOpRef<expected<Provide::MediaHandle>> Provide::attachMedia( const std::vector<zypp::Url> &urls, const ProvideMediaSpec &request )
974  {
975  Z_D();
976 
977  if ( urls.empty() ) {
978  return makeReadyResult( expected<Provide::MediaHandle>::error( ZYPP_EXCPT_PTR( zypp::media::MediaException("No usable mirrors in mirrorlist."))) );
979  }
980 
981  // sanitize the mirrors to contain only URLs that have same worker types
982  std::vector<zypp::Url> usableMirrs;
983  std::optional<ProvideQueue::Config> scheme;
984 
985  for ( auto mirrIt = urls.begin() ; mirrIt != urls.end(); mirrIt++ ) {
986  const auto &s = d->schemeConfig( d->effectiveScheme( mirrIt->getScheme() ) );
987  if ( !s ) {
988  WAR << "URL: " << *mirrIt << " is not supported, ignoring!" << std::endl;
989  continue;
990  }
991  if ( !scheme ) {
992  scheme = *s;
993  usableMirrs.push_back ( *mirrIt );
994  } else {
995  if ( scheme->worker_type () == s->worker_type () ) {
996  usableMirrs.push_back( *mirrIt );
997  } else {
998  WAR << "URL: " << *mirrIt << " has different worker type than the primary URL: "<< usableMirrs.front() <<", ignoring!" << std::endl;
999  }
1000  }
1001  }
1002 
1003  if ( !scheme || usableMirrs.empty() ) {
1004  return makeReadyResult( expected<Provide::MediaHandle>::error( ZYPP_EXCPT_PTR ( zypp::media::MediaException("No valid mirrors available") )) );
1005  }
1006 
1007  // first check if there is a already attached medium we can use as well
1008  auto &attachedMedia = d->attachedMediaInfos ();
1009  for ( auto &medium : attachedMedia ) {
1010  if ( medium.isSameMedium ( usableMirrs, request ) ) {
1011  medium.ref();
1012  return makeReadyResult( expected<Provide::MediaHandle>::success( Provide::MediaHandle( *this, medium._name) ) );
1013  }
1014  }
1015 
1016  auto op = AttachMediaItem::create( usableMirrs, request, *d_func() );
1017  d->queueItem (op);
1018  return op->promise();
1019  }
1020 
1021  void Provide::releaseMedia( const std::string &mediaRef )
1022  {
1023  Z_D();
1024 
1025  if ( mediaRef.empty() )
1026  return;
1027 
1028  const auto i = std::find_if( d->_attachedMediaInfos.begin(), d->_attachedMediaInfos.end(), [&]( const auto &info ){ return info._name == mediaRef;} );
1029  if ( i == d->_attachedMediaInfos.end() ) {
1030  ERR << "Unknown media attach handle" << std::endl;
1031  return;
1032  }
1033 
1034  // only unref'ing here, the scheduler will generate a message to the queues if needed
1035  i->unref();
1036  }
1037 
1038  AsyncOpRef< expected<ProvideRes> > Provide::provide( const std::vector<zypp::Url> &urls, const ProvideFileSpec &request )
1039  {
1040  Z_D();
1041  auto op = ProvideFileItem::create( urls, request, *d );
1042  d->queueItem (op);
1043  return op->promise();
1044  }
1045 
1046  AsyncOpRef< expected<ProvideRes> > Provide::provide( const zypp::Url &url, const ProvideFileSpec &request )
1047  {
1048  return provide( std::vector<zypp::Url>{ url }, request );
1049  }
1050 
1051  AsyncOpRef< expected<ProvideRes> > Provide::provide( const MediaHandle &attachHandle, const zypp::Pathname &fileName, const ProvideFileSpec &request )
1052  {
1053  Z_D();
1054  const auto i = std::find_if( d->_attachedMediaInfos.begin(), d->_attachedMediaInfos.end(), [&]( const auto &info ){ return info._name == attachHandle.handle();} );
1055  if ( i == d->_attachedMediaInfos.end() ) {
1056  return makeReadyResult( expected<ProvideRes>::error( ZYPP_EXCPT_PTR( zypp::media::MediaException("Invalid attach handle")) ) );
1057  }
1058 
1059  // for downloading items we need to make the baseUrl part of the request URL
1060  zypp::Url url = i->_attachedUrl;
1061 
1062  // real mount devices use a ID to reference a attached medium, for those we do not need to send the baseUrl as well since its already
1063  // part of the mount point, so if we mount host:/path/to/repo to the ID 1234 and look for the file /path/to/repo/file1 the request URL will look like: nfs-media://1234/file1
1064  if ( i->_workerType == ProvideQueue::Config::SimpleMount || i->_workerType == ProvideQueue::Config::VolatileMount ) {
1065  url = zypp::Url();
1066  // work around the zypp::Url requirements for certain Url schemes by attaching a suffix, that way we are always able to have a authority
1067  url.setScheme( i->_attachedUrl.getScheme() + std::string(constants::ATTACHED_MEDIA_SUFFIX) );
1068  url.setAuthority( i->_name );
1069  url.setPathName("/");
1070  }
1071 
1072  url.appendPathName( fileName );
1073  auto op = ProvideFileItem::create( {url}, request, *d );
1074 
1075  i->ref();
1076  op->setMediaRef( MediaHandle( *this, i->_name ));
1077 
1078  d->queueItem (op);
1079  return op->promise();
1080  }
1081 
1082  AsyncOpRef<expected<std::string>> Provide::checksumForFile ( const zypp::Pathname &p, const std::string &algorithm )
1083  {
1084  using namespace zyppng::operators;
1085 
1086  zypp::Url url("chksum:///");
1087  url.setPathName( p );
1088  auto fut = provide( url, zyppng::ProvideFileSpec().setCustomHeaderValue( "chksumType", algorithm ) )
1089  | mbind( [algorithm]( zyppng::ProvideRes &&chksumRes ) {
1090  if ( chksumRes.headers().contains(algorithm) )
1091  return expected<std::string>::success( chksumRes.headers().value(algorithm).asString() );
1092  return expected<std::string>::error( ZYPP_EXCPT_PTR( zypp::FileCheckException("Invalid/Empty checksum returned from worker") ) );
1093  } );
1094  return fut;
1095  }
1096 
1097  AsyncOpRef<expected<zypp::ManagedFile>> Provide::copyFile ( const zypp::Pathname &source, const zypp::Pathname &target )
1098  {
1099  using namespace zyppng::operators;
1100 
1101  zypp::Url url("copy:///");
1102  url.setPathName( source );
1103  auto fut = provide( url, ProvideFileSpec().setDestFilenameHint( target ))
1104  | mbind( [&]( ProvideRes &&copyRes) {
1105  return expected<zypp::ManagedFile>::success( copyRes.asManagedFile() );
1106  } );
1107  return fut;
1108  }
1109 
1111  {
1112  Z_D();
1113  d->_isRunning = true;
1114  d->_pulseTimer->start( 5000 );
1115  d->schedule( ProvidePrivate::ProvideStart );
1116  if ( d->_log ) d->_log->provideStart();
1117  }
1118 
1120  {
1121  d_func()->_workerPath = path;
1122  }
1123 
1124  bool Provide::ejectDevice(const std::string &queueRef, const std::string &device)
1125  {
1126  if ( !queueRef.empty() ) {
1128  }
1129  return false;
1130  }
1131 
1132  void Provide::setStatusTracker( ProvideStatusRef tracker )
1133  {
1134  d_func()->_log = tracker;
1135  }
1136 
1138  {
1139  return d_func()->_workDir;
1140  }
1141 
1143  {
1144  Z_D();
1145  return d->_credManagerOptions;
1146  }
1147 
1149  {
1150  d_func()->_credManagerOptions = opt;
1151  }
1152 
1153  SignalProxy<void ()> Provide::sigIdle()
1154  {
1155  return d_func()->_sigIdle;
1156  }
1157 
1158  SignalProxy<Provide::MediaChangeAction ( const std::string &queueRef, const std::string &, const int32_t, const std::vector<std::string> &, const std::optional<std::string> &)> Provide::sigMediaChangeRequested()
1159  {
1160  return d_func()->_sigMediaChange;
1161  }
1162 
1163  SignalProxy< std::optional<zypp::media::AuthData> ( const zypp::Url &reqUrl, const std::string &triedUsername, const std::map<std::string, std::string> &extraValues ) > Provide::sigAuthRequired()
1164  {
1165  return d_func()->_sigAuthRequired;
1166  }
1167 
1169 
1170  ProvideStatus::ProvideStatus( ProvideRef parent )
1171  : _provider( parent )
1172  { }
1173 
1175  {
1176  _stats = Stats();
1177  _stats._startTime = std::chrono::steady_clock::now();
1178  _stats._lastPulseTime = std::chrono::steady_clock::now();
1179  }
1180 
1182  {
1183  const auto &sTime = item.startTime();
1184  const auto &fTime = item.finishedTime();
1185  if ( sTime > sTime.min() && fTime >= sTime ) {
1186  auto duration = std::chrono::duration_cast<std::chrono::seconds>( item.finishedTime() - item.startTime() );
1187  if ( duration.count() )
1188  MIL << "Item finished after " << duration.count() << " seconds, with " << zypp::ByteCount( item.currentStats()->_bytesProvided.operator zypp::ByteCount::SizeType() / duration.count() ) << "/s" << std::endl;
1189  MIL << "Item finished after " << (item.finishedTime() - item.startTime()).count() << " ns" << std::endl;
1190  }
1191  pulse( );
1192  }
1193 
1195  {
1196  MIL << "Item failed" << std::endl;
1197  }
1198 
1200  {
1201  return _stats;
1202  }
1203 
1205  {
1206  auto prov = _provider.lock();
1207  if ( !prov )
1208  return;
1209 
1210  const auto lastFinishedBytes = _stats._finishedBytes;
1211  const auto lastPartialBytes = _stats._partialBytes;
1212  _stats._expectedBytes = _stats._finishedBytes; // finished bytes are expected too!
1213  zypp::ByteCount tmpPartialBytes (0); // bytes that are finished in staging, but not commited to cache yet
1214 
1215  for ( const auto &i : prov->d_func()->items() ) {
1216 
1217  if ( !i // maybe released during scheduling
1218  || i->state() == ProvideItem::Cancelling )
1219  continue;
1220 
1221  if ( i->state() == ProvideItem::Uninitialized
1222  || i->state() == ProvideItem::Pending ) {
1223  _stats._expectedBytes += i->bytesExpected();
1224  continue;
1225  }
1226 
1227  i->pulse();
1228 
1229  const auto & stats = i->currentStats();
1230  const auto & prevStats = i->previousStats();
1231  if ( !stats || !prevStats ) {
1232  ERR << "Bug! Stats should be initialized by now" << std::endl;
1233  continue;
1234  }
1235 
1236  if ( i->state() == ProvideItem::Downloading
1237  || i->state() == ProvideItem::Processing
1238  || i->state() == ProvideItem::Finalizing ) {
1239  _stats._expectedBytes += stats->_bytesExpected;
1240  tmpPartialBytes += stats->_bytesProvided;
1241  } else if ( i->state() == ProvideItem::Finished ) {
1242  _stats._finishedBytes += stats->_bytesProvided; // remember those bytes are finished in stats directly
1243  _stats._expectedBytes += stats->_bytesProvided;
1244  }
1245  }
1246 
1247  const auto now = std::chrono::steady_clock::now();
1248  const auto sinceLast = std::chrono::duration_cast<std::chrono::seconds>( now - _stats._lastPulseTime );
1249  const auto lastFinB = lastPartialBytes + lastFinishedBytes;
1250  const auto currFinB = tmpPartialBytes + _stats._finishedBytes;
1251 
1252  const auto diff = currFinB - lastFinB;
1253  _stats._lastPulseTime = now;
1254  _stats._partialBytes = tmpPartialBytes;
1255 
1256  if ( sinceLast >= std::chrono::seconds(1) )
1257  _stats._perSecondSinceLastPulse = ( diff / ( sinceLast.count() ) );
1258 
1259  auto sinceStart = std::chrono::duration_cast<std::chrono::seconds>( _stats._lastPulseTime - _stats._startTime );
1260  if ( sinceStart.count() ) {
1261  const size_t diff = _stats._finishedBytes + _stats._partialBytes;
1262  _stats._perSecond = zypp::ByteCount( diff / sinceStart.count() );
1263  }
1264  }
1265 }
std::string getScheme() const
Returns the scheme name of the URL.
Definition: Url.cc:533
constexpr auto DEFAULT_MAX_DYNAMIC_WORKERS
Definition: provide_p.h:41
#define MIL
Definition: Logger.h:96
SignalProxy< void()> sigIdle()
Definition: provide.cc:1153
const zypp::Pathname & providerWorkdir() const
Definition: provide.cc:1137
zypp::ByteCount _finishedBytes
Definition: provide.h:72
static ProvideFileItemRef create(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request, ProvidePrivate &parent)
Definition: provideitem.cc:564
Unit::ValueType SizeType
Definition: ByteCount.h:37
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
Definition: provide.cc:727
bool queueRequest(ProvideRequestRef req)
Definition: provide.cc:792
ProvideStatusRef _log
Definition: provide_p.h:153
ZYPP_IMPL_PRIVATE(Provide)
Pathname realpath() const
Returns this path as the absolute canonical pathname.
Definition: Pathname.cc:231
std::vector< AttachedMediaInfo > _attachedMediaInfos
Definition: provide_p.h:139
AsyncOpRef< expected< ProvideRes > > provide(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request)
Definition: provide.cc:1038
const zypp::media::CredManagerOptions & credManangerOptions() const
Definition: provide.cc:1142
virtual void pulse()
Definition: provide.cc:1204
ProvideStatus(ProvideRef parent)
Definition: provide.cc:1170
std::list< ProvideItemRef > _items
Definition: provide_p.h:129
void appendPathName(const Pathname &path_r, EEncoding eflag_r=zypp::url::E_DECODED)
Extend the path name.
Definition: Url.cc:786
Store and operate with byte count.
Definition: ByteCount.h:30
std::shared_ptr< Data > _ref
Definition: provide.h:55
Signal< void()> _sigIdle
Definition: provide_p.h:154
A ProvideRes object is a reference counted ownership of a resource in the cache provided by a Provide...
Definition: provideres.h:35
const zypp::Pathname & workerPath() const
Definition: provide.cc:830
virtual void itemFailed(ProvideItem &item)
Definition: provide.cc:1194
std::string nextMediaId() const
Definition: provide.cc:771
std::string stripSuffix(const C_Str &str_r, const C_Str &suffix_r)
Strip a suffix_r from str_r and return the resulting string.
Definition: String.h:1048
String related utilities and Regular expression matching.
void enqueue(ProvideRequestRef request)
Definition: providequeue.cc:92
Data(Provide &parent, const std::string &hdl)
Definition: provide.cc:925
AsyncOpRef< expected< std::string > > checksumForFile(const zypp::Pathname &p, const std::string &algorithm)
Definition: provide.cc:1082
bool provideDebugEnabled()
Definition: providedbg_p.h:28
const std::string & asString(const std::string &t)
Global asString() that works with std::string too.
Definition: String.h:139
Definition: Arch.h:363
std::unordered_map< std::string, ProvideQueue::Config > _schemeConfigs
Definition: provide_p.h:142
AutoDispose< const Pathname > ManagedFile
A Pathname plus associated cleanup code to be executed when path is no longer needed.
Definition: ManagedFile.h:27
uint32_t nextRequestId()
Definition: provide.cc:917
zypp::Pathname _workerPath
Definition: provide_p.h:150
std::chrono::time_point< std::chrono::steady_clock > TimePoint
static ProvideRef create(const zypp::Pathname &workDir="")
Definition: provide.cc:963
Timer::Ptr _scheduleTrigger
Definition: provide_p.h:126
Convenient building of std::string with boost::format.
Definition: String.h:252
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
Definition: Logger.h:113
zypp::ByteCount _perSecond
Definition: provide.h:76
std::chrono::steady_clock::time_point _lastPulseTime
Definition: provide.h:69
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:432
State state() const
Definition: provideitem.cc:494
void setAuthority(const std::string &authority)
Set the authority component in the URL.
Definition: Url.cc:698
#define ERR
Definition: Logger.h:98
void setCredManagerOptions(const zypp::media::CredManagerOptions &opt)
Definition: provide.cc:1148
Assign a vaiable a certain value when going out of scope.
Definition: dtorreset.h:49
bool isInCache(const zypp::Pathname &downloadedFile) const
Definition: provide.cc:747
bool empty() const
Test for an empty path.
Definition: Pathname.h:114
void setPathName(const std::string &path, EEncoding eflag=zypp::url::E_DECODED)
Set the path name.
Definition: Url.cc:764
expected< ProvideQueue::Config > schemeConfig(const std::string &scheme)
Definition: provide.cc:712
ProvidePrivate(zypp::Pathname &&workDir, Provide &pub)
Definition: provide.cc:21
std::vector< AttachedMediaInfo > & attachedMediaInfos()
Definition: provide.cc:707
std::deque< QueueItem > _queues
Definition: provide_p.h:136
void onItemStateChanged(ProvideItem &item)
Definition: provide.cc:899
std::list< ProvideItemRef > & items()
Definition: provide.cc:697
SignalProxy< MediaChangeAction(const std::string &queueRef, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)> sigMediaChangeRequested()
Definition: provide.cc:1158
void setScheme(const std::string &scheme)
Set the scheme name in the URL.
Definition: Url.cc:668
int unlink(const Pathname &path)
Like &#39;unlink&#39;.
Definition: PathInfo.cc:700
const std::string & asString() const
String representation.
Definition: Pathname.h:91
const Config & workerConfig() const
Just inherits Exception to separate media exceptions.
zypp::ByteCount _expectedBytes
Definition: provide.h:73
virtual void itemDone(ProvideItem &item)
Definition: provide.cc:1181
bool ejectDevice(const std::string &queueRef, const std::string &device)
Definition: provide.cc:1124
void setWorkerPath(const zypp::Pathname &path)
Definition: provide.cc:1119
#define WAR
Definition: Logger.h:97
ProvideWeakRef _provider
Definition: provide.h:97
#define nullptr
Definition: Easy.h:55
void onPulseTimeout(Timer &)
Definition: provide.cc:858
const std::optional< ItemStats > & currentStats() const
Definition: provideitem.cc:122
bool isRunning() const
Definition: provide.cc:844
bool startsWith(const C_Str &str_r, const C_Str &prefix_r)
alias for hasPrefix
Definition: String.h:1085
ProvideMediaHandle MediaHandle
Definition: provide.h:109
void schedule(ScheduleReason reason)
Definition: provide.cc:38
zypp::Pathname _workDir
Definition: provide_p.h:127
virtual void provideStart()
Definition: provide.cc:1174
constexpr auto DEFAULT_CPU_WORKERS
Definition: provide_p.h:42
zypp::ByteCount _partialBytes
Definition: provide.h:74
bool isValid() const
Verifies the Url.
Definition: Url.cc:489
constexpr auto DEFAULT_ACTIVE_CONN_PER_HOST
Definition: provide_p.h:39
const std::string queueName(ProvideQueue &q) const
Definition: provide.cc:835
void dequeueItem(ProvideItem *item)
Definition: provide.cc:759
Provide(const zypp::Pathname &workDir)
Definition: provide.cc:957
#define MIL_PRV
Definition: providedbg_p.h:35
zypp::media::CredManagerOptions _credManagerOptions
Definition: provide_p.h:151
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
Definition: providequeue.cc:57
constexpr std::string_view DEFAULT_PROVIDE_WORKER_PATH
Definition: provide_p.h:37
std::unordered_map< std::string, FileCacheItem > _fileCache
Definition: provide_p.h:148
bool dequeueRequest(ProvideRequestRef req, std::exception_ptr error)
Definition: provide.cc:808
AsyncOpRef< expected< MediaHandle > > attachMedia(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request)
Definition: provide.cc:973
Base class for Exception.
Definition: Exception.h:145
constexpr auto DEFAULT_ACTIVE_CONN
Definition: provide_p.h:40
static AttachMediaItemRef create(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request, ProvidePrivate &parent)
void queueItem(ProvideItemRef item)
Definition: provide.cc:753
reference value() const
Reference to the Tp object.
Definition: AutoDispose.h:147
#define DBG_PRV
Definition: providedbg_p.h:34
zypp::media::CredManagerOptions & credManagerOptions()
Definition: provide.cc:702
static expected< ProvideRequestRef > createDetach(const zypp::Url &url)
Definition: provideitem.cc:75
zypp::ByteCount _perSecondSinceLastPulse
Definition: provide.h:75
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Definition: AutoDispose.h:93
void setStatusTracker(ProvideStatusRef tracker)
Definition: provide.cc:1132
std::unordered_map< std::string, std::string > _workerAlias
Definition: provide_p.h:111
std::chrono::steady_clock::time_point _startTime
Definition: provide.h:68
const Stats & stats() const
Definition: provide.cc:1199
AsyncOpRef< expected< zypp::ManagedFile > > copyFile(const zypp::Pathname &source, const zypp::Pathname &target)
Definition: provide.cc:1097
Wrapper class for ::stat/::lstat.
Definition: PathInfo.h:220
constexpr std::string_view device("device")
static bool openTray(const std::string &device_r)
Definition: cdtools.cc:33
std::string handle() const
Definition: provide.cc:951
AttachedMediaInfo & addMedium(zypp::proto::Capabilities::WorkerType workerType, const zypp::Url &baseUrl, ProvideMediaSpec &spec)
void releaseMedia(const std::string &mediaRef)
Definition: provide.cc:1021
Easy-to use interface to the ZYPP dependency resolver.
Definition: CodePitfalls.doc:1
void doSchedule(Timer &)
Definition: provide.cc:70
std::unordered_map< std::string, ProvideQueueRef > _workerQueues
Definition: provide_p.h:141
std::string effectiveScheme(const std::string &scheme) const
Definition: provide.cc:849
std::shared_ptr< Provide > parent() const
Definition: provide.cc:941
virtual std::chrono::steady_clock::time_point startTime() const
Definition: provideitem.cc:132
constexpr std::string_view ATTACHED_MEDIA_SUFFIX
Definition: provide_p.h:38
Url manipulation class.
Definition: Url.h:91
virtual std::chrono::steady_clock::time_point finishedTime() const
Definition: provideitem.cc:137
#define DBG
Definition: Logger.h:95
SignalProxy< std::optional< zypp::media::AuthData > const zypp::Url &reqUrl, const std::string &triedUsername, const std::map< std::string, std::string > &extraValues) > sigAuthRequired()
Definition: provide.cc:1163