14 #include <zypp-core/zyppng/base/Timer> 15 #include <zypp-core/zyppng/base/SocketNotifier> 16 #include <zypp-core/zyppng/base/EventDispatcher> 20 #include <zypp/base/Logger.h> 21 #include <zypp/base/String.h> 22 #include <zypp-core/base/DtorReset> 24 using namespace boost;
37 static const std::string
_value(
40 , curl_version_info(CURLVERSION_NOW)->version
47 NetworkRequestDispatcherPrivate::NetworkRequestDispatcherPrivate( NetworkRequestDispatcher &p )
49 , _timer( Timer::create() )
50 , _multi ( curl_multi_init() )
56 curl_multi_setopt(
_multi, CURLMOPT_TIMERDATA, reinterpret_cast<void *>(
this ) );
58 curl_multi_setopt(
_multi, CURLMOPT_SOCKETDATA, reinterpret_cast<void *>(
this ) );
64 _timer->setSingleShot(
true );
71 curl_multi_cleanup(
_multi );
78 assert( that !=
nullptr );
80 if ( timeout_ms >= 0 ) {
81 that->
_timer->start( static_cast<uint64_t>(timeout_ms) );
97 assert( that !=
nullptr );
103 std::shared_ptr<SocketNotifier> socketp;
106 if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
109 socketp = SocketNotifier::create( s, SocketNotifier::Read,
false );
119 if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
125 void *privatePtr =
nullptr;
126 if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
127 privatePtr =
nullptr;
137 WAR <<
"Cleaning up unassigned easy handle" << std::endl;
138 curl_multi_remove_handle(
_multi, easy );
139 curl_easy_cleanup( easy );
145 if ( what == CURL_POLL_REMOVE ) {
146 socketp->setEnabled(
false );
151 if ( what == CURL_POLL_IN ) {
152 socketp->setMode( SocketNotifier::Read );
153 }
else if ( what == CURL_POLL_OUT ) {
154 socketp->setMode( SocketNotifier::Write );
155 }
else if ( what == CURL_POLL_INOUT ) {
156 socketp->setMode( SocketNotifier::Read | SocketNotifier::Write );
159 socketp->setEnabled();
166 if ( (events & SocketNotifier::Read) == SocketNotifier::Read )
167 evBitmask |= CURL_CSELECT_IN;
168 if ( (events & SocketNotifier::Write) == SocketNotifier::Write )
169 evBitmask |= CURL_CSELECT_OUT;
171 evBitmask |= CURL_CSELECT_ERR;
184 CURLMcode rc = CURLM_OK;
188 rc = curl_multi_socket_action(
_multi, nativeSocket, evBitmask, &running );
206 CURLMsg *msg =
nullptr;
207 while( (msg = curl_multi_info_read(
_multi, &msgs_left )) ) {
208 if(msg->msg == CURLMSG_DONE) {
209 CURL *easy = msg->easy_handle;
210 CURLcode res = msg->data.result;
212 void *privatePtr =
nullptr;
213 if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
214 WAR <<
"Unable to get CURLINFO_PRIVATE" << std::endl;
220 WAR <<
"Cleaning up unassigned easy handle" << std::endl;
221 curl_multi_remove_handle(
_multi, easy );
222 curl_easy_cleanup( easy );
230 std::string errBuf =
"Broken easy handle in request";
240 errBuf =
"Failed to reinitialize the request";
279 auto delReq = [](
auto &list,
NetworkRequest &req ) -> std::shared_ptr<NetworkRequest> {
280 auto it = std::find_if( list.begin(), list.end(), [ &req ](
const std::shared_ptr<NetworkRequest> &r ) {
281 return req.d_func() == r->d_func();
283 if ( it != list.end() ) {
294 auto rmode = std::get_if<NetworkRequestPrivate::running_t>( &req.d_func()->_runningMode );
296 if ( rmode->_isInCallback ) {
298 if ( !rmode->_cachedResult )
299 rmode->_cachedResult = result;
301 }
else if ( rmode->_cachedResult ) {
302 result = rmode->_cachedResult.value();
310 void *easyHandle = req.d_func()->_easyHandle;
312 curl_multi_remove_handle(
_multi, easyHandle );
314 req.d_func()->_dispatcher =
nullptr;
318 req.d_func()->setResult( std::move(result) );
327 CURLMcode rc = curl_multi_add_handle(
_multi, req.d_func()->_easyHandle );
347 std::string errBuf =
"Failed to initialize easy handle";
348 if ( !req->d_func()->initialize( errBuf ) ) {
357 req->d_func()->aboutToStart();
373 NetworkRequestDispatcher::NetworkRequestDispatcher( )
379 bool NetworkRequestDispatcher::supportsProtocol(
const Url &url )
381 curl_version_info_data *curl_info =
nullptr;
382 curl_info = curl_version_info(CURLVERSION_NOW);
384 if (curl_info->protocols)
386 const char *
const *proto;
387 std::string scheme( url.getScheme() );
389 for(proto=curl_info->protocols; !found && *proto; ++proto) {
390 if( scheme == std::string((
const char *)*proto))
398 void NetworkRequestDispatcher::setMaximumConcurrentConnections(
const int maxConn )
400 d_func()->_maxConnections = maxConn;
403 int NetworkRequestDispatcher::maximumConcurrentConnections ()
const 405 return d_func()->_maxConnections;
408 void NetworkRequestDispatcher::enqueue(
const std::shared_ptr<NetworkRequest> &req )
414 if ( std::find( d->_runningDownloads.begin(), d->_runningDownloads.end(), req ) != d->_runningDownloads.end() ) {
415 WAR <<
"Ignoring request to enqueue download " << req->url().asString() <<
" request is already running " << std::endl;
419 if ( std::find( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), req ) != d->_pendingDownloads.end() ) {
420 WAR <<
"Ignoring request to enqueue download " << req->url().asString() <<
" request is already enqueued " << std::endl;
424 req->d_func()->_dispatcher =
this;
426 d->_pendingDownloads.push_back( req );
428 auto it = std::find_if( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), [ prio = req->priority() ](
const auto &pendingReq ){
429 return pendingReq->priority() < prio;
433 if ( it != d->_pendingDownloads.end() && it != d->_pendingDownloads.begin() )
435 d->_pendingDownloads.insert( it, req );
442 void NetworkRequestDispatcher::setAgentString(
const std::string &agent )
448 d->_userAgent = agent;
453 return d_func()->_userAgent;
456 void NetworkRequestDispatcher::setHostSpecificHeader(
const std::string &host,
const std::string &headerName,
const std::string &value )
459 if ( value.empty() ) {
460 if (
auto i = d->_customHeaders.find( host ); i != d->_customHeaders.end() ) {
461 if (
auto v = i->second.find( headerName ); v != i->second.end() ) {
464 if ( i->second.empty() )
465 d->_customHeaders.erase(i);
469 d->_customHeaders[host][headerName] = value;
472 const NetworkRequestDispatcher::SpecificHeaderMap &NetworkRequestDispatcher::hostSpecificHeaders()
const 474 return d_func()->_customHeaders;
477 void NetworkRequestDispatcher::cancel( NetworkRequest &req, std::string reason )
482 void NetworkRequestDispatcher::cancel(NetworkRequest &req,
const NetworkRequestError &err)
486 if ( req.d_func()->_dispatcher != this ) {
491 d->setFinished( req, err );
494 void NetworkRequestDispatcher::run()
497 d->_isRunning =
true;
499 if ( d->_pendingDownloads.size() )
503 void NetworkRequestDispatcher::reschedule()
506 if ( !d->_pendingDownloads.size() )
509 std::stable_sort( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), [](
const auto &
a,
const auto &
b ){
510 return a->priority() <
b->priority();
516 size_t NetworkRequestDispatcher::count()
519 return d->_pendingDownloads.size() + d->_runningDownloads.size();
524 return d_func()->_lastError;
527 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadStarted()
529 return d_func()->_sigDownloadStarted;
532 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadFinished()
534 return d_func()->_sigDownloadFinished;
537 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigQueueFinished()
539 return d_func()->_sigQueueFinished;
542 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigError()
544 return d_func()->_sigError;
void globalInitCurlOnce()
std::string errorMessage() const
NetworkRequestError _lastError
static int multi_timer_cb(CURLM *multi, long timeout_ms, void *g)
void cancelAll(NetworkRequestError result)
ZYPP_IMPL_PRIVATE(Provide)
int socketCallback(CURL *easy, curl_socket_t s, int what, void *)
static const std::string & defaultAgentString()
static zyppng::NetworkRequestError fromCurlMError(int nativeCode)
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
std::string form(const char *format,...) __attribute__((format(printf
Printf style construction of std::string.
Signal< void(NetworkRequestDispatcher &)> _sigQueueFinished
Assign a vaiable a certain value when going out of scope.
std::shared_ptr< Timer > _timer
std::map< curl_socket_t, std::shared_ptr< SocketNotifier > > _socketHandler
static zyppng::NetworkRequestError fromCurlError(NetworkRequest &req, int nativeCode, const std::string &nativeError)
bool addRequestToMultiHandle(NetworkRequest &req)
static int static_socket_callback(CURL *easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp)
Provides API related macros.
std::deque< std::shared_ptr< NetworkRequest > > _pendingDownloads
void multiTimerTimout(const Timer &t)
virtual ~NetworkRequestDispatcherPrivate()
void handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
The NetworkRequestError class Represents a error that occured in.
const long & ZYPP_MEDIA_CURL_DEBUG()
const long& for setting CURLOPT_DEBUGDATA Returns a reference to a static variable, so it's safe to pass ...
Signal< void(NetworkRequestDispatcher &)> _sigError
const char * agentString()
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadStarted
void setFinished(NetworkRequest &req, NetworkRequestError result)
#define LIBZYPP_VERSION_STRING
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadFinished
std::vector< std::shared_ptr< NetworkRequest > > _runningDownloads
void onSocketActivated(const SocketNotifier &listener, int events)
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})
bool prepareToContinue(std::string &errBuf)