libzypp  17.31.31
networkrequestdispatcher.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 ----------------------------------------------------------------------*/
9 #include <zypp/APIConfig.h>
14 #include <zypp-core/zyppng/base/Timer>
15 #include <zypp-core/zyppng/base/SocketNotifier>
16 #include <zypp-core/zyppng/base/EventDispatcher>
18 #include <assert.h>
19 
20 #include <zypp/base/Logger.h>
21 #include <zypp/base/String.h>
22 #include <zypp-core/base/DtorReset>
23 
24 using namespace boost;
25 
27 
28 
29 namespace zyppng {
30 
31 static const std::string & defaultAgentString()
32 {
33  // we need to add the release and identifier to the
34  // agent string.
35  // The target could be not initialized, and then this information
36  // is guessed.
37  static const std::string _value(
39  "ZYpp " LIBZYPP_VERSION_STRING " (curl %s)"
40  , curl_version_info(CURLVERSION_NOW)->version
41  )
42  );
43  return _value;
44 }
45 
46 
47 NetworkRequestDispatcherPrivate::NetworkRequestDispatcherPrivate( NetworkRequestDispatcher &p )
48  : BasePrivate( p )
49  , _timer( Timer::create() )
50  , _multi ( curl_multi_init() )
51  , _userAgent( defaultAgentString() )
52 {
54 
55  curl_multi_setopt( _multi, CURLMOPT_TIMERFUNCTION, NetworkRequestDispatcherPrivate::multi_timer_cb );
56  curl_multi_setopt( _multi, CURLMOPT_TIMERDATA, reinterpret_cast<void *>( this ) );
57  curl_multi_setopt( _multi, CURLMOPT_SOCKETFUNCTION, NetworkRequestDispatcherPrivate::static_socket_callback );
58  curl_multi_setopt( _multi, CURLMOPT_SOCKETDATA, reinterpret_cast<void *>( this ) );
59 
60  // disabled explicit pipelining since it breaks our tests on releases < 15.2
61  // we could consider enabling it starting with a specific CURL version
62  // curl_multi_setopt( _multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX|CURLPIPE_HTTP1 );
63 
64  _timer->setSingleShot( true );
65  _timer->connect( &Timer::sigExpired, *this, &NetworkRequestDispatcherPrivate::multiTimerTimout );
66 }
67 
69 {
71  curl_multi_cleanup( _multi );
72 }
73 
74 //called by curl to setup a timer
75 int NetworkRequestDispatcherPrivate::multi_timer_cb( CURLM *, long timeout_ms, void *thatPtr )
76 {
77  NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( thatPtr );
78  assert( that != nullptr );
79 
80  if ( timeout_ms >= 0 ) {
81  that->_timer->start( static_cast<uint64_t>(timeout_ms) );
82  } else {
83  //cancel the timer
84  that->_timer->stop();
85  }
86  return 0;
87 }
88 
90 {
91  handleMultiSocketAction( CURL_SOCKET_TIMEOUT, 0 );
92 }
93 
94 int NetworkRequestDispatcherPrivate::static_socket_callback(CURL * easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp )
95 {
96  NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( userp );
97  assert( that != nullptr );
98  return that->socketCallback( easy, s, what, socketp );
99 }
100 
101 int NetworkRequestDispatcherPrivate::socketCallback(CURL *easy, curl_socket_t s, int what, void * )
102 {
103  std::shared_ptr<SocketNotifier> socketp;
104 
105  if ( _socketHandler.count( s ) == 0 ) {
106  if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
107  return 0;
108 
109  socketp = SocketNotifier::create( s, SocketNotifier::Read, false );
110  _socketHandler.insert( std::make_pair( s, socketp ) );
111 
112  socketp->connect( &SocketNotifier::sigActivated, *this, &NetworkRequestDispatcherPrivate::onSocketActivated );
113  } else {
114  socketp = _socketHandler[s];
115  }
116 
117  //should never happen
118  if ( !socketp ) {
119  if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
120  return 0;
121 
122  if ( _socketHandler.count( s ) > 0 )
123  _socketHandler.erase( s );
124 
125  void *privatePtr = nullptr;
126  if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
127  privatePtr = nullptr; //make sure this was not filled with bad info
128  }
129 
130  if ( privatePtr ) {
131  NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
132  //we stop the download, if we can not listen for socket changes we can not correctly do anything
133  setFinished( *request->z_func(), NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Unable to assign socket listener." ) );
134  return 0;
135  } else {
136  //a broken handle without anything assigned, also should never happen but make sure and clean it up
137  WAR << "Cleaning up unassigned easy handle" << std::endl;
138  curl_multi_remove_handle( _multi, easy );
139  curl_easy_cleanup( easy );
140  return 0;
141  }
142  }
143 
144  //remove the socket
145  if ( what == CURL_POLL_REMOVE ) {
146  socketp->setEnabled( false );
147  _socketHandler.erase( s );
148  return 0;
149  }
150 
151  if ( what == CURL_POLL_IN ) {
152  socketp->setMode( SocketNotifier::Read );
153  } else if ( what == CURL_POLL_OUT ) {
154  socketp->setMode( SocketNotifier::Write );
155  } else if ( what == CURL_POLL_INOUT ) {
156  socketp->setMode( SocketNotifier::Read | SocketNotifier::Write );
157  }
158 
159  socketp->setEnabled();
160  return 0;
161 }
162 
163 void NetworkRequestDispatcherPrivate::onSocketActivated( const SocketNotifier &listener, int events )
164 {
165  int evBitmask = 0;
166  if ( (events & SocketNotifier::Read) == SocketNotifier::Read )
167  evBitmask |= CURL_CSELECT_IN;
168  if ( (events & SocketNotifier::Write) == SocketNotifier::Write )
169  evBitmask |= CURL_CSELECT_OUT;
170  if ( (events & SocketNotifier::Error) == SocketNotifier::Error )
171  evBitmask |= CURL_CSELECT_ERR;
172 
173  handleMultiSocketAction( listener.socket(), evBitmask );
174 }
175 
176 void NetworkRequestDispatcherPrivate::handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
177 {
178  int running = 0;
179 
180  // when inside a curl callback we can not call another multi curl API,
181  // for now just lock the thing, but we should consider rewriting this
182  // to post events instead of doing direct calls simply to decouple from
183  // that limitation
184  CURLMcode rc = CURLM_OK;
185  {
186  zypp::DtorReset lockSet( _locked );
187  _locked = true;
188  rc = curl_multi_socket_action( _multi, nativeSocket, evBitmask, &running );
189  }
190  if (rc != 0) {
191  //we can not recover from a error like that, cancel all and stop
193  cancelAll( err );
194  //emit error
195  _lastError = err;
196  _sigError.emit( *z_func() );
197  return;
198  }
199 
200  // make sure we dequeue pending requests ( in case a call to dequeue was blocked during the API call )
201  zypp::OnScopeExit scopeFinally([this](){
202  this->dequeuePending();
203  });
204 
205  int msgs_left = 0;
206  CURLMsg *msg = nullptr;
207  while( (msg = curl_multi_info_read( _multi, &msgs_left )) ) {
208  if(msg->msg == CURLMSG_DONE) {
209  CURL *easy = msg->easy_handle;
210  CURLcode res = msg->data.result;
211 
212  void *privatePtr = nullptr;
213  if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
214  WAR << "Unable to get CURLINFO_PRIVATE" << std::endl;
215  continue;
216  }
217 
218  if ( !privatePtr ) {
219  //broken easy handle not associated, should never happen but clean it up
220  WAR << "Cleaning up unassigned easy handle" << std::endl;
221  curl_multi_remove_handle( _multi, easy );
222  curl_easy_cleanup( easy );
223  continue;
224  }
225 
226  NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
227  request->dequeueNotify();
228 
229  if ( request->hasMoreWork() && ( res == CURLE_OK || request->canRecover() ) ) {
230  std::string errBuf = "Broken easy handle in request";
231  if ( !request->_easyHandle ) {
233  setFinished( *request->z_func(), e );
234  continue;
235  }
236 
237  // remove the handle from multi to change options
238  curl_multi_remove_handle( _multi, request->_easyHandle );
239 
240  errBuf = "Failed to reinitialize the request";
241  if ( !request->prepareToContinue ( errBuf ) ) {
243  setFinished( *request->z_func(), e );
244  } else {
245  // add the request back to the multi handle, it is not done
246  if ( !addRequestToMultiHandle( *request->z_func() ) )
247  continue;
248 
249  request->aboutToStart( );
250  }
251  } else {
252  //trigger notification about file downloaded
253  NetworkRequestError e = NetworkRequestErrorPrivate::fromCurlError( *request->z_func(), res, request->errorMessage() );
254  setFinished( *request->z_func(), e );
255  }
256  //attention request could be deleted from here on
257  }
258  }
259 }
260 
262 {
263  //prevent dequeuePending from filling up the runningDownloads again
264  zypp::DtorReset lockReset( _locked );
265  _locked = true;
266 
267  while ( _runningDownloads.size() ) {
268  std::shared_ptr<NetworkRequest> &req = _runningDownloads.back();
269  setFinished(*req, result );
270  }
271  while ( _pendingDownloads.size() ) {
272  std::shared_ptr<NetworkRequest> &req = _pendingDownloads.back();
273  setFinished(*req, result );
274  }
275 }
276 
278 {
279  auto delReq = []( auto &list, NetworkRequest &req ) -> std::shared_ptr<NetworkRequest> {
280  auto it = std::find_if( list.begin(), list.end(), [ &req ]( const std::shared_ptr<NetworkRequest> &r ) {
281  return req.d_func() == r->d_func();
282  } );
283  if ( it != list.end() ) {
284  auto ptr = *it;
285  list.erase( it );
286  return ptr;
287  }
288  return nullptr;
289  };
290 
291  // We have a tricky situation if a network request is called when inside a callback. In those cases, it is
292  // not allowed to call curl_multi_remove_handle. We need to tell the callback to fail, so the download
293  // is cancelled by curl itself. We also need to store the current result for later
294  auto rmode = std::get_if<NetworkRequestPrivate::running_t>( &req.d_func()->_runningMode );
295  if ( rmode ) {
296  if ( rmode->_isInCallback ) {
297  // the first cached result wins)
298  if ( !rmode->_cachedResult )
299  rmode->_cachedResult = result;
300  return;
301  } else if ( rmode->_cachedResult ) {
302  result = rmode->_cachedResult.value();
303  }
304  }
305 
306  auto rLocked = delReq( _runningDownloads, req );
307  if ( !rLocked )
308  rLocked = delReq( _pendingDownloads, req );
309 
310  void *easyHandle = req.d_func()->_easyHandle;
311  if ( easyHandle )
312  curl_multi_remove_handle( _multi, easyHandle );
313 
314  req.d_func()->_dispatcher = nullptr;
315 
316  //first set the result, the Request might have a checksum to check as well so a currently
317  //successful request could fail later on
318  req.d_func()->setResult( std::move(result) );
319  _sigDownloadFinished.emit( *z_func(), req );
320 
321  //we got a open slot, try to dequeue or send the finished signals if all queues are empty
322  dequeuePending();
323 }
324 
326 {
327  CURLMcode rc = curl_multi_add_handle( _multi, req.d_func()->_easyHandle );
328  if ( rc != 0 ) {
330  return false;
331  }
332  return true;
333 }
334 
336 {
337  if ( !_isRunning || _locked )
338  return;
339 
340  while ( _maxConnections == -1 || ( (std::size_t)_maxConnections > _runningDownloads.size() ) ) {
341  if ( !_pendingDownloads.size() )
342  break;
343 
344  std::shared_ptr<NetworkRequest> req = std::move( _pendingDownloads.front() );
345  _pendingDownloads.pop_front();
346 
347  std::string errBuf = "Failed to initialize easy handle";
348  if ( !req->d_func()->initialize( errBuf ) ) {
349  //@TODO store the CURL error in the errors extra info
351  continue;
352  }
353 
354  if ( !addRequestToMultiHandle( *req ) )
355  continue;
356 
357  req->d_func()->aboutToStart();
358  _sigDownloadStarted.emit( *z_func(), *req );
359 
360  _runningDownloads.push_back( std::move(req) );
361  }
362 
363  //check for empty queues
364  if ( _pendingDownloads.size() == 0 && _runningDownloads.size() == 0 ) {
365  //once we finished all requests, cancel the timer too, so curl is not called without requests
366  _timer->stop();
367  _sigQueueFinished.emit( *z_func() );
368  }
369 }
370 
371 ZYPP_IMPL_PRIVATE(NetworkRequestDispatcher)
372 
373 NetworkRequestDispatcher::NetworkRequestDispatcher( )
374  : Base( * new NetworkRequestDispatcherPrivate ( *this ) )
375 {
376 
377 }
378 
379 bool NetworkRequestDispatcher::supportsProtocol( const Url &url )
380 {
381  curl_version_info_data *curl_info = nullptr;
382  curl_info = curl_version_info(CURLVERSION_NOW);
383  // curl_info does not need any free (is static)
384  if (curl_info->protocols)
385  {
386  const char * const *proto;
387  std::string scheme( url.getScheme() );
388  bool found = false;
389  for(proto=curl_info->protocols; !found && *proto; ++proto) {
390  if( scheme == std::string((const char *)*proto))
391  found = true;
392  }
393  return found;
394  }
395  return true;
396 }
397 
398 void NetworkRequestDispatcher::setMaximumConcurrentConnections( const int maxConn )
399 {
400  d_func()->_maxConnections = maxConn;
401 }
402 
403 int NetworkRequestDispatcher::maximumConcurrentConnections () const
404 {
405  return d_func()->_maxConnections;
406 }
407 
408 void NetworkRequestDispatcher::enqueue(const std::shared_ptr<NetworkRequest> &req )
409 {
410  if ( !req )
411  return;
412  Z_D();
413 
414  if ( std::find( d->_runningDownloads.begin(), d->_runningDownloads.end(), req ) != d->_runningDownloads.end() ) {
415  WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already running " << std::endl;
416  return;
417  }
418 
419  if ( std::find( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), req ) != d->_pendingDownloads.end() ) {
420  WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already enqueued " << std::endl;
421  return;
422  }
423 
424  req->d_func()->_dispatcher = this;
425  if ( req->priority() == NetworkRequest::Normal )
426  d->_pendingDownloads.push_back( req );
427  else {
428  auto it = std::find_if( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), [ prio = req->priority() ]( const auto &pendingReq ){
429  return pendingReq->priority() < prio;
430  });
431 
432  //if we have a valid iterator, decrement we found a pending download request with lower prio, insert before that
433  if ( it != d->_pendingDownloads.end() && it != d->_pendingDownloads.begin() )
434  it--;
435  d->_pendingDownloads.insert( it, req );
436  }
437 
438  //dequeue if running and we have capacity
439  d->dequeuePending();
440 }
441 
442 void NetworkRequestDispatcher::setAgentString( const std::string &agent )
443 {
444  Z_D();
445  if ( agent.empty() )
446  d->_userAgent = defaultAgentString();
447  else
448  d->_userAgent = agent;
449 }
450 
451 const std::string &NetworkRequestDispatcher::agentString() const
452 {
453  return d_func()->_userAgent;
454 }
455 
456 void NetworkRequestDispatcher::setHostSpecificHeader( const std::string &host, const std::string &headerName, const std::string &value )
457 {
458  Z_D();
459  if ( value.empty() ) {
460  if ( auto i = d->_customHeaders.find( host ); i != d->_customHeaders.end() ) {
461  if ( auto v = i->second.find( headerName ); v != i->second.end() ) {
462  i->second.erase (v);
463  }
464  if ( i->second.empty() )
465  d->_customHeaders.erase(i);
466  }
467  return;
468  }
469  d->_customHeaders[host][headerName] = value;
470 }
471 
472 const NetworkRequestDispatcher::SpecificHeaderMap &NetworkRequestDispatcher::hostSpecificHeaders() const
473 {
474  return d_func()->_customHeaders;
475 }
476 
477 void NetworkRequestDispatcher::cancel( NetworkRequest &req, std::string reason )
478 {
479  cancel( req, NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, reason.size() ? std::move(reason) : "Request explicitly cancelled" ) );
480 }
481 
482 void NetworkRequestDispatcher::cancel(NetworkRequest &req, const NetworkRequestError &err)
483 {
484  Z_D();
485 
486  if ( req.d_func()->_dispatcher != this ) {
487  //TODO throw exception
488  return;
489  }
490 
491  d->setFinished( req, err );
492 }
493 
494 void NetworkRequestDispatcher::run()
495 {
496  Z_D();
497  d->_isRunning = true;
498 
499  if ( d->_pendingDownloads.size() )
500  d->dequeuePending();
501 }
502 
503 void NetworkRequestDispatcher::reschedule()
504 {
505  Z_D();
506  if ( !d->_pendingDownloads.size() )
507  return;
508 
509  std::stable_sort( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), []( const auto &a, const auto &b ){
510  return a->priority() < b->priority();
511  });
512 
513  d->dequeuePending();
514 }
515 
516 size_t NetworkRequestDispatcher::count()
517 {
518  Z_D();
519  return d->_pendingDownloads.size() + d->_runningDownloads.size();
520 }
521 
522 const zyppng::NetworkRequestError &NetworkRequestDispatcher::lastError() const
523 {
524  return d_func()->_lastError;
525 }
526 
527 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadStarted()
528 {
529  return d_func()->_sigDownloadStarted;
530 }
531 
532 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadFinished()
533 {
534  return d_func()->_sigDownloadFinished;
535 }
536 
537 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigQueueFinished()
538 {
539  return d_func()->_sigQueueFinished;
540 }
541 
542 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigError()
543 {
544  return d_func()->_sigError;
545 }
546 
547 }
void globalInitCurlOnce()
Definition: curlhelper.cc:64
std::string errorMessage() const
Definition: request.cc:564
static int multi_timer_cb(CURLM *multi, long timeout_ms, void *g)
void cancelAll(NetworkRequestError result)
ZYPP_IMPL_PRIVATE(Provide)
Boost libraries.
unsigned short b
int socketCallback(CURL *easy, curl_socket_t s, int what, void *)
static const std::string & defaultAgentString()
static zyppng::NetworkRequestError fromCurlMError(int nativeCode)
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
Definition: Logger.h:113
std::string form(const char *format,...) __attribute__((format(printf
Printf style construction of std::string.
Definition: String.cc:36
Edition * _value
Definition: SysContent.cc:311
Signal< void(NetworkRequestDispatcher &)> _sigQueueFinished
Assign a vaiable a certain value when going out of scope.
Definition: dtorreset.h:49
std::map< curl_socket_t, std::shared_ptr< SocketNotifier > > _socketHandler
static zyppng::NetworkRequestError fromCurlError(NetworkRequest &req, int nativeCode, const std::string &nativeError)
static int static_socket_callback(CURL *easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp)
Provides API related macros.
std::deque< std::shared_ptr< NetworkRequest > > _pendingDownloads
#define WAR
Definition: Logger.h:97
void handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
The NetworkRequestError class Represents a error that occured in.
const long & ZYPP_MEDIA_CURL_DEBUG()
const long& for setting CURLOPT_DEBUGDATA Returns a reference to a static variable, so it&#39;s safe to pass ...
Definition: curlhelper.cc:36
Signal< void(NetworkRequestDispatcher &)> _sigError
const char * agentString()
Definition: MediaCurl.cc:282
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadStarted
void setFinished(NetworkRequest &req, NetworkRequestError result)
#define LIBZYPP_VERSION_STRING
Definition: APIConfig.h:15
unsigned short a
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadFinished
std::vector< std::shared_ptr< NetworkRequest > > _runningDownloads
void onSocketActivated(const SocketNotifier &listener, int events)
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})
bool prepareToContinue(std::string &errBuf)
Definition: request.cc:411