libzypp  17.31.31
rangedownloader_p.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 ----------------------------------------------------------------------*/
9 
14 #include <zypp-core/AutoDispose.h>
15 #include <zypp-core/fs/PathInfo.h>
16 
17 #include "rangedownloader_p.h"
18 
19 namespace zyppng {
20 
22  { }
23 
24  void RangeDownloaderBaseState::onRequestProgress( NetworkRequest &, off_t , off_t, off_t , off_t )
25  {
26  off_t dlnowMulti = _downloadedMultiByteCount;
27  for( const auto &req : _runningRequests ) {
28  dlnowMulti += req->downloadedByteCount();
29  }
30 
31  if ( !assertExpectedFilesize( dlnowMulti ) )
32  return;
33 
34  stateMachine()._sigProgress.emit( *stateMachine().z_func(), _fileSize, dlnowMulti );
35  }
36 
38  {
39  auto lck = stateMachine().z_func()->shared_from_this();
40  auto it = std::find_if( _runningRequests.begin(), _runningRequests.end(), [ &req ]( const std::shared_ptr<Request> &r ) {
41  return ( r.get() == &req );
42  });
43  if ( it == _runningRequests.end() )
44  return;
45 
46  auto reqLocked = *it;
47 
48  //remove from running
49  _runningRequests.erase( it );
50 
51  //feed the working URL back into the mirrors in case there are still running requests that might fail
52  // @TODO , finishing the transfer might never be called in case of cancelling the request, need a better way to track running transfers
53  if ( reqLocked->_myMirror )
54  reqLocked->_myMirror->finishTransfer( !err.isError() );
55 
56  if ( err.isError() ) {
57  return handleRequestError( reqLocked, err );
58  }
59 
62  return;
63  }
64 
65  MIL << req.nativeHandle() << " " << "Request finished successfully."<<std::endl;
66  const auto &rngs = reqLocked->requestedRanges();
67  std::for_each( rngs.begin(), rngs.end(), [&req]( const auto &b ){ DBG_MEDIA << req.nativeHandle() << " " << "-> Block " << b.start << " finished." << std::endl; } );
68 
69  auto restartReqWithBlock = [ this ]( std::shared_ptr<Request> &req, std::vector<Block> &&blocks ) {
70  MIL << req->nativeHandle() << " " << "Reusing Request to download blocks:"<<std::endl;
71  if ( !addBlockRanges( req, std::move( blocks ) ) )
72  return false;
73 
74  //this is not a new request, only add to queues but do not connect signals again
75  addNewRequest( req, false );
76  return true;
77  };
78 
79  //check if we already have enqueued all blocks if not reuse the request
80  if ( _ranges.size() ) {
81  MIL << req.nativeHandle() << " " << "Reusing to download blocks: "<<std::endl;
82  if ( !restartReqWithBlock( reqLocked, getNextBlocks( reqLocked->url().getScheme() ) ) ) {
83  return setFailed( "Failed to restart request with new blocks." );
84  }
85  return;
86 
87  } else {
88  //if we have failed blocks, try to download them with this mirror
89  if ( !_failedRanges.empty() ) {
90 
91  auto fblks = getNextFailedBlocks( reqLocked->url().getScheme() );
92  MIL << req.nativeHandle() << " " << "Reusing to download failed blocks: "<<std::endl;
93  if ( !restartReqWithBlock( reqLocked, std::move(fblks) ) ) {
94  return setFailed( "Failed to restart request with previously failed blocks." );
95  }
96  return;
97  }
98  }
99 
100  //feed the working URL back into the mirrors in case there are still running requests that might fail
101  _fileMirrors.push_back( reqLocked->_originalUrl );
102 
103  // make sure downloads are running, at this point
105  }
106 
107  void RangeDownloaderBaseState::handleRequestError( std::shared_ptr<Request> req, const zyppng::NetworkRequestError &err )
108  {
109  bool retry = false;
110  auto &parent = stateMachine();
111 
112 
113  //Handle the auth errors explicitly, we need to give the user a way to put in new credentials
114  //if we get valid new credentials we can retry the request
116  retry = parent.handleRequestAuthError( req, err );
117  } else {
118 
119  //if a error happens during a multi download we try to use another mirror to download the failed block
120  MIL << req->nativeHandle() << " " << "Request failed " << req->extendedErrorString() << "(" << req->url() << ")" << std::endl;
121  if ( req->lastRedirectInfo ().size () )
122  MIL << req->nativeHandle() << " Last redirection target was: " << req->lastRedirectInfo () << std::endl;
123 
124  NetworkRequestError dummyErr;
125 
126  const auto &fRanges = req->failedRanges();
127  try {
128  std::transform( fRanges.begin(), fRanges.end(), std::back_inserter(_failedRanges), [ &req ]( const auto &r ){
129  Block b = std::any_cast<Block>(r.userData);;
130  b._failedWithErr = req->error();
132  DBG_MEDIA << "Adding failed block to failed blocklist: " << b.start << " " << b.len << " (" << req->error().toString() << " [" << req->error().nativeErrorString()<< "])" << std::endl;
133  return b;
134  });
135 
136  // try to fill the open spot right away
138  return;
139 
140  } catch ( const zypp::Exception &ex ) {
141  //we just log the exception and fall back to a normal download
142  WAR << "Multipart download failed: " << ex.asString() << std::endl;
143  }
144  }
145 
146  //if rety is true we just enqueue the request again, usually this means authentication was updated
147  if ( retry ) {
148  //make sure this request will run asap
149  req->setPriority( parent._defaultSubRequestPriority );
150 
151  //this is not a new request, only add to queues but do not connect signals again
152  addNewRequest( req, false );
153  return;
154  }
155 
156  //we do not have more mirrors left we can try
157  cancelAll ( err );
158 
159  // not all hope is lost, maybe a normal download can work out?
160  // fall back to normal download
161  _sigFailed.emit();
162  }
163 
165  {
167  return;
168 
169  zypp::OnScopeExit clearFlag( [this]() {
171  });
172 
174 
175  //check if there is still work to do
176  while ( _ranges.size() || _failedRanges.size() ) {
177 
178  // download was already finished
179  if ( _error.isError() )
180  return;
181 
182  if ( _runningRequests.size() >= 10 )
183  break;
184 
185  // prepareNextMirror will automatically call mirrorReceived() once there is a mirror ready
186  const auto &res = prepareNextMirror();
187  // if mirrors are delayed we stop here, once the mirrors are ready we get called again
189  return;
190  else if ( res == MirrorHandlingStateBase::Failed ) {
191  failedToPrepare();
192  return;
193  }
194  }
195 
196  // check if we are done at this point
197  if ( _runningRequests.empty() ) {
198 
199  if ( _failedRanges.size() || _ranges.size() ) {
201  return;
202  }
203 
204  // seems we were successfull , transition to finished state
205  setFinished();
206  }
207  }
208 
210  {
211 
212  auto &parent = stateMachine();
213  Url myUrl;
214  TransferSettings settings;
215 
216  auto err = setupMirror( mirror, myUrl, settings );
217  if ( err.isError() ) {
218  WAR << "Failure to setup mirror " << myUrl << " with error " << err.toString() << "("<< err.nativeErrorString() << "), dropping it from the list of mirrors." << std::endl;
219  // if a mirror fails , we remove it from our list
220  _fileMirrors.erase( mirror.first );
221 
222  // make sure this is retried
224  return;
225  }
226 
227  auto blocks = getNextBlocks( myUrl.getScheme() );
228  if ( !blocks.size() )
229  blocks = getNextFailedBlocks( myUrl.getScheme() );
230 
231  if ( !blocks.size() ) {
232  // We have no blocks. In theory, that should never happen, but for safety, we error out here. It is better than
233  // getting stuck.
234  setFailed( NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Mirror requested after all blocks were downloaded." ) );
235  return;
236  }
237 
238  const auto &spec = parent._spec;
239 
240  std::shared_ptr<Request> req = std::make_shared<Request>( ::internal::clearQueryString( myUrl ), spec.targetPath(), NetworkRequest::WriteShared );
241  req->_myMirror = mirror.second;
242  req->_originalUrl = myUrl;
243  req->setPriority( parent._defaultSubRequestPriority );
244  req->transferSettings() = settings;
245 
246  // if we download chunks we do not want to wait for too long on mirrors that have slow activity
247  // note: this sets the activity timeout, not the download timeout
248  req->transferSettings().setTimeout( 2 );
249 
250  MIL << "Creating Request to download blocks via mirror: " << myUrl << std::endl;
251  if ( !addBlockRanges( req, std::move(blocks) ) ) {
253  return;
254  }
255 
256  // we just use a mirror once per file, remove it from the list
257  _fileMirrors.erase( mirror.first );
258 
259  addNewRequest( req );
260 
261  // trigger next downloads
263  }
264 
266  {
267  // it was impossible to find a new mirror, check if we still have running requests we can wait for, if not
268  // we can only fail at this point
269  if ( !_runningRequests.size() ) {
271  }
272  }
273 
275  {
276  bool triggerResched = false;
277  for ( auto &req : _runningRequests ) {
278  if ( req->state() == NetworkRequest::Pending ) {
279  triggerResched = true;
280  req->setPriority( NetworkRequest::Critical, false );
281  }
282  }
283  if ( triggerResched )
284  stateMachine()._requestDispatcher->reschedule();
285  }
286 
287  void RangeDownloaderBaseState::addNewRequest(std::shared_ptr<Request> req , const bool connectSignals)
288  {
289  if ( connectSignals )
290  req->connectSignals( *this );
291 
292  _runningRequests.push_back( req );
293  stateMachine()._requestDispatcher->enqueue( req );
294 
295  if ( req->_myMirror )
296  req->_myMirror->startTransfer();
297  }
298 
300  {
301  const off_t expFSize = stateMachine()._spec.expectedFileSize();
302  if ( expFSize > 0 && expFSize < currentFilesize ) {
304  return false;
305  }
306  return true;
307  }
308 
312  bool RangeDownloaderBaseState::addBlockRanges ( std::shared_ptr<Request> req , std::vector<Block> &&blocks ) const
313  {
314  req->resetRequestRanges();
315  for ( const auto &block : blocks ) {
316  if ( block.chksumVec && block.chksumtype.size() ) {
317  std::optional<zypp::Digest> dig = zypp::Digest();
318  if ( !dig->create( block.chksumtype ) ) {
319  WAR_MEDIA << "Trying to create Digest with chksum type " << block.chksumtype << " failed " << std::endl;
320  return false;
321  }
322 
324  DBG_MEDIA << "Starting block " << block.start << " with checksum " << zypp::Digest::digestVectorToString( *block.chksumVec ) << "." << std::endl;
325  req->addRequestRange( block.start, block.len, std::move(dig), *block.chksumVec, std::any( block ), block.chksumCompareLen, block.chksumPad );
326  } else {
327 
329  DBG_MEDIA << "Starting block " << block.start << " without checksum." << std::endl;
330  req->addRequestRange( block.start, block.len, {}, {}, std::any( block ) );
331  }
332  }
333  return true;
334  }
335 
337  {
338  _error = std::move( err );
339  cancelAll( _error );
340  zypp::filesystem::unlink( stateMachine()._spec.targetPath() );
341  _sigFailed.emit();
342  }
343 
344  void RangeDownloaderBaseState::setFailed(std::string &&reason)
345  {
347  }
348 
350  {
352  _sigFinished.emit();
353  }
354 
356  {
357  while( _runningRequests.size() ) {
358  auto req = _runningRequests.back();
359  req->disconnectSignals();
360  _runningRequests.pop_back();
361  stateMachine()._requestDispatcher->cancel( *req, err );
362  if ( req->_myMirror )
363  req->_myMirror->cancelTransfer();
364  }
365  }
366 
367  std::vector<RangeDownloaderBaseState::Block> RangeDownloaderBaseState::getNextBlocks( const std::string &urlScheme )
368  {
369  std::vector<Block> blocks;
370  const auto prefSize = std::max<zypp::ByteCount>( _preferredChunkSize, zypp::ByteCount(4, zypp::ByteCount::K) );
371  size_t accumulatedSize = 0;
372 
373  bool canDoRandomBlocks = ( zypp::str::hasPrefixCI( urlScheme, "http") );
374 
375  std::optional<size_t> lastBlockEnd;
376  while ( _ranges.size() && accumulatedSize < prefSize ) {
377  const auto &r = _ranges.front();
378 
379  if ( !canDoRandomBlocks && lastBlockEnd ) {
380  if ( static_cast<const size_t>(r.start) != (*lastBlockEnd)+1 )
381  break;
382  }
383 
384  lastBlockEnd = r.start + r.len - 1;
385  accumulatedSize += r.len;
386 
387  blocks.push_back( std::move( _ranges.front() ) );
388  _ranges.pop_front();
389 
390  }
391  DBG_MEDIA << "Accumulated " << blocks.size() << " blocks with accumulated size of: " << accumulatedSize << "." << std::endl;
392  return blocks;
393  }
394 
395  std::vector<RangeDownloaderBaseState::Block> RangeDownloaderBaseState::getNextFailedBlocks( const std::string &urlScheme )
396  {
397  const auto prefSize = std::max<zypp::ByteCount>( _preferredChunkSize, zypp::ByteCount(4, zypp::ByteCount::K) );
398  // sort the failed requests by block number, this should make sure get them in offset order as well
399  _failedRanges.sort( []( const auto &a , const auto &b ){ return a.start < b.start; } );
400 
401  bool canDoRandomBlocks = ( zypp::str::hasPrefixCI( urlScheme, "http") );
402 
403  std::vector<Block> fblks;
404  std::optional<size_t> lastBlockEnd;
405  size_t accumulatedSize = 0;
406  while ( _failedRanges.size() ) {
407 
408  const auto &block =_failedRanges.front();
409 
410  //we need to check if we have consecutive blocks because only http mirrors support random request ranges
411  if ( !canDoRandomBlocks && lastBlockEnd ) {
412  if ( static_cast<const size_t>(block.start) != (*lastBlockEnd)+1 )
413  break;
414  }
415 
416  lastBlockEnd = block.start + block.len - 1;
417  accumulatedSize += block.len;
418 
419  fblks.push_back( std::move( _failedRanges.front() ));
420  _failedRanges.pop_front();
421 
422  fblks.back()._retryCount += 1;
423 
424  if ( accumulatedSize >= prefSize )
425  break;
426  }
427 
428  return fblks;
429  }
430 
432  {
433  // this case should never happen because we never start a multi download if we do not know the filesize beforehand
434  if ( filesize == 0 ) return 4 * 1024 * 1024;
435  else if ( filesize < 4*1024*1024 ) return filesize;
436  else if ( filesize < 8*1024*1024 ) return 4*1024*1024;
437  else if ( filesize < 16*1024*1024 ) return 8*1024*1024;
438  else if ( filesize < 256*1024*1024 ) return 10*1024*1024;
439  return 4*1024*1024;
440  }
441 }
bool assertExpectedFilesize(off_t currentFilesize)
bool isError() const
isError Will return true if this is a actual error
#define MIL
Definition: Logger.h:96
void onRequestFinished(NetworkRequest &req, const NetworkRequestError &err)
void * nativeHandle() const
Definition: request.cc:840
void onRequestProgress(NetworkRequest &, off_t, off_t, off_t, off_t)
#define DBG_MEDIA
Definition: mediadebug_p.h:28
void mirrorReceived(MirrorControl::MirrorPick mirror) override
unsigned short b
bool hasPrefixCI(const C_Str &str_r, const C_Str &prefix_r)
Definition: String.h:1030
Compute Message Digests (MD5, SHA1 etc)
Definition: Digest.h:37
Store and operate with byte count.
Definition: ByteCount.h:30
Holds transfer setting.
zypp::ByteCount downloadedByteCount() const
Returns the number of already downloaded bytes as reported by the backend.
Definition: request.cc:946
Url clearQueryString(const Url &url)
Definition: curlhelper.cc:373
void setFailed(NetworkRequestError &&err)
static std::string digestVectorToString(const UByteArray &vec)
get hex string representation of the digest vector given as parameter
Definition: Digest.cc:241
static zypp::ByteCount makeBlksize(size_t filesize)
int unlink(const Pathname &path)
Like &#39;unlink&#39;.
Definition: PathInfo.cc:700
bool addBlockRanges(std::shared_ptr< Request > req, std::vector< Block > &&blocks) const
Just initialize the requests ranges from the internal blocklist.
std::string asString() const
Error message provided by dumpOn as string.
Definition: Exception.cc:75
void cancelAll(const NetworkRequestError &err)
void onRequestStarted(NetworkRequest &)
#define WAR
Definition: Logger.h:97
The NetworkRequestError class Represents a error that occured in.
const long & ZYPP_MEDIA_CURL_DEBUG()
const long& for setting CURLOPT_DEBUGDATA Returns a reference to a static variable, so it&#39;s safe to pass ...
Definition: curlhelper.cc:36
std::vector< Block > getNextFailedBlocks(const std::string &urlScheme)
void setTimeout(long t)
set the transfer timeout
std::pair< std::vector< Url >::const_iterator, MirrorHandle > MirrorPick
NetworkRequestError setupMirror(const MirrorControl::MirrorPick &pick, Url &url, TransferSettings &set)
#define WAR_MEDIA
Definition: mediadebug_p.h:30
void handleRequestError(std::shared_ptr< Request > req, const zyppng::NetworkRequestError &err)
Base class for Exception.
Definition: Exception.h:145
unsigned short a
static const Unit K
1024 Byte
Definition: ByteCount.h:45
std::vector< Block > getNextBlocks(const std::string &urlScheme)
Type type() const
type Returns the type of the error
void addNewRequest(std::shared_ptr< Request > req, const bool connectSignals=true)
std::vector< std::shared_ptr< Request > > _runningRequests
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})