我有一个小工具,我用来从一个网站上下载一个MP3文件,然后构建/更新一个播客XML文件,我已经添加到iTunes。

创建/更新XML文件的文本处理是用Python编写的。但是,我在Windows .bat文件中使用wget来下载实际的MP3文件。我更喜欢用Python编写整个实用程序。

我努力寻找一种用Python实际下载该文件的方法,因此我使用了wget。

那么,如何使用Python下载文件呢?


当前回答

我想从网页上下载所有的文件。我尝试了wget,但它失败了,所以我决定使用Python路由,我找到了这个线程。

读完之后,我做了一个小的命令行应用程序,soupget,扩展了PabloG和Stan的优秀答案,并添加了一些有用的选项。

它使用BeatifulSoup收集页面的所有url,然后下载具有所需扩展名的url。最后,它可以同时下载多个文件。

下面就是:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import (division, absolute_import, print_function, unicode_literals)
import sys, os, argparse
from bs4 import BeautifulSoup

# --- insert Stan's script here ---
# if sys.version_info >= (3,): 
#...
#...
# def download_file(url, dest=None): 
#...
#...

# --- new stuff ---
def collect_all_url(page_url, extensions):
    """
    Recovers all links in page_url checking for all the desired extensions
    """
    conn = urllib2.urlopen(page_url)
    html = conn.read()
    soup = BeautifulSoup(html, 'lxml')
    links = soup.find_all('a')

    results = []    
    for tag in links:
        link = tag.get('href', None)
        if link is not None: 
            for e in extensions:
                if e in link:
                    # Fallback for badly defined links
                    # checks for missing scheme or netloc
                    if bool(urlparse.urlparse(link).scheme) and bool(urlparse.urlparse(link).netloc):
                        results.append(link)
                    else:
                        new_url=urlparse.urljoin(page_url,link)                        
                        results.append(new_url)
    return results

if __name__ == "__main__":  # Only run if this file is called directly
    # Command line arguments
    parser = argparse.ArgumentParser(
        description='Download all files from a webpage.')
    parser.add_argument(
        '-u', '--url', 
        help='Page url to request')
    parser.add_argument(
        '-e', '--ext', 
        nargs='+',
        help='Extension(s) to find')    
    parser.add_argument(
        '-d', '--dest', 
        default=None,
        help='Destination where to save the files')
    parser.add_argument(
        '-p', '--par', 
        action='store_true', default=False, 
        help="Turns on parallel download")
    args = parser.parse_args()

    # Recover files to download
    all_links = collect_all_url(args.url, args.ext)

    # Download
    if not args.par:
        for l in all_links:
            try:
                filename = download_file(l, args.dest)
                print(l)
            except Exception as e:
                print("Error while downloading: {}".format(e))
    else:
        from multiprocessing.pool import ThreadPool
        results = ThreadPool(10).imap_unordered(
            lambda x: download_file(x, args.dest), all_links)
        for p in results:
            print(p)

它的用法示例如下:

python3 soupget.py -p -e <list of extensions> -d <destination_folder> -u <target_webpage>

还有一个实际的例子,如果你想看到它的作用:

python3 soupget.py -p -e .xlsx .pdf .csv -u https://healthdata.gov/dataset/chemicals-cosmetics

其他回答

我写了下面的代码,它可以在普通的Python 2或Python 3中工作。


import sys
try:
    import urllib.request
    python3 = True
except ImportError:
    import urllib2
    python3 = False


def progress_callback_simple(downloaded,total):
    sys.stdout.write(
        "\r" +
        (len(str(total))-len(str(downloaded)))*" " + str(downloaded) + "/%d"%total +
        " [%3.2f%%]"%(100.0*float(downloaded)/float(total))
    )
    sys.stdout.flush()

def download(srcurl, dstfilepath, progress_callback=None, block_size=8192):
    def _download_helper(response, out_file, file_size):
        if progress_callback!=None: progress_callback(0,file_size)
        if block_size == None:
            buffer = response.read()
            out_file.write(buffer)

            if progress_callback!=None: progress_callback(file_size,file_size)
        else:
            file_size_dl = 0
            while True:
                buffer = response.read(block_size)
                if not buffer: break

                file_size_dl += len(buffer)
                out_file.write(buffer)

                if progress_callback!=None: progress_callback(file_size_dl,file_size)
    with open(dstfilepath,"wb") as out_file:
        if python3:
            with urllib.request.urlopen(srcurl) as response:
                file_size = int(response.getheader("Content-Length"))
                _download_helper(response,out_file,file_size)
        else:
            response = urllib2.urlopen(srcurl)
            meta = response.info()
            file_size = int(meta.getheaders("Content-Length")[0])
            _download_helper(response,out_file,file_size)

import traceback
try:
    download(
        "https://geometrian.com/data/programming/projects/glLib/glLib%20Reloaded%200.5.9/0.5.9.zip",
        "output.zip",
        progress_callback_simple
    )
except:
    traceback.print_exc()
    input()

注:

支持“进度条”回调。 从我的网站上下载一个4 MB的测试包。zip。

还有一个,使用urlretrieve:

import urllib.request
urllib.request.urlretrieve("http://www.example.com/songs/mp3.mp3", "mp3.mp3")

(对于Python 2使用import urllib和urllib.urlretrieve)

如果速度对你来说很重要,我为urllib和wget模块做了一个小的性能测试,关于wget,我尝试了一次状态栏和一次没有状态栏。我使用了三个不同的500MB文件进行测试(不同的文件-以消除在底层进行缓存的可能性)。在debian机器上测试,使用python2。

首先,这些是结果(它们在不同的运行中是相似的):

$ python wget_test.py 
urlretrive_test : starting
urlretrive_test : 6.56
==============
wget_no_bar_test : starting
wget_no_bar_test : 7.20
==============
wget_with_bar_test : starting
100% [......................................................................] 541335552 / 541335552
wget_with_bar_test : 50.49
==============

我执行测试的方式是使用“profile”装饰器。这是完整的代码:

import wget
import urllib
import time
from functools import wraps

def profile(func):
    @wraps(func)
    def inner(*args):
        print func.__name__, ": starting"
        start = time.time()
        ret = func(*args)
        end = time.time()
        print func.__name__, ": {:.2f}".format(end - start)
        return ret
    return inner

url1 = 'http://host.com/500a.iso'
url2 = 'http://host.com/500b.iso'
url3 = 'http://host.com/500c.iso'

def do_nothing(*args):
    pass

@profile
def urlretrive_test(url):
    return urllib.urlretrieve(url)

@profile
def wget_no_bar_test(url):
    return wget.download(url, out='/tmp/', bar=do_nothing)

@profile
def wget_with_bar_test(url):
    return wget.download(url, out='/tmp/')

urlretrive_test(url1)
print '=============='
time.sleep(1)

wget_no_bar_test(url2)
print '=============='
time.sleep(1)

wget_with_bar_test(url3)
print '=============='
time.sleep(1)

Urllib似乎是最快的

我想从网页上下载所有的文件。我尝试了wget,但它失败了,所以我决定使用Python路由,我找到了这个线程。

读完之后,我做了一个小的命令行应用程序,soupget,扩展了PabloG和Stan的优秀答案,并添加了一些有用的选项。

它使用BeatifulSoup收集页面的所有url,然后下载具有所需扩展名的url。最后,它可以同时下载多个文件。

下面就是:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import (division, absolute_import, print_function, unicode_literals)
import sys, os, argparse
from bs4 import BeautifulSoup

# --- insert Stan's script here ---
# if sys.version_info >= (3,): 
#...
#...
# def download_file(url, dest=None): 
#...
#...

# --- new stuff ---
def collect_all_url(page_url, extensions):
    """
    Recovers all links in page_url checking for all the desired extensions
    """
    conn = urllib2.urlopen(page_url)
    html = conn.read()
    soup = BeautifulSoup(html, 'lxml')
    links = soup.find_all('a')

    results = []    
    for tag in links:
        link = tag.get('href', None)
        if link is not None: 
            for e in extensions:
                if e in link:
                    # Fallback for badly defined links
                    # checks for missing scheme or netloc
                    if bool(urlparse.urlparse(link).scheme) and bool(urlparse.urlparse(link).netloc):
                        results.append(link)
                    else:
                        new_url=urlparse.urljoin(page_url,link)                        
                        results.append(new_url)
    return results

if __name__ == "__main__":  # Only run if this file is called directly
    # Command line arguments
    parser = argparse.ArgumentParser(
        description='Download all files from a webpage.')
    parser.add_argument(
        '-u', '--url', 
        help='Page url to request')
    parser.add_argument(
        '-e', '--ext', 
        nargs='+',
        help='Extension(s) to find')    
    parser.add_argument(
        '-d', '--dest', 
        default=None,
        help='Destination where to save the files')
    parser.add_argument(
        '-p', '--par', 
        action='store_true', default=False, 
        help="Turns on parallel download")
    args = parser.parse_args()

    # Recover files to download
    all_links = collect_all_url(args.url, args.ext)

    # Download
    if not args.par:
        for l in all_links:
            try:
                filename = download_file(l, args.dest)
                print(l)
            except Exception as e:
                print("Error while downloading: {}".format(e))
    else:
        from multiprocessing.pool import ThreadPool
        results = ThreadPool(10).imap_unordered(
            lambda x: download_file(x, args.dest), all_links)
        for p in results:
            print(p)

它的用法示例如下:

python3 soupget.py -p -e <list of extensions> -d <destination_folder> -u <target_webpage>

还有一个实际的例子,如果你想看到它的作用:

python3 soupget.py -p -e .xlsx .pdf .csv -u https://healthdata.gov/dataset/chemicals-cosmetics

Python 2/3的PabloG代码的改进版本:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import ( division, absolute_import, print_function, unicode_literals )

import sys, os, tempfile, logging

if sys.version_info >= (3,):
    import urllib.request as urllib2
    import urllib.parse as urlparse
else:
    import urllib2
    import urlparse

def download_file(url, dest=None):
    """ 
    Download and save a file specified by url to dest directory,
    """
    u = urllib2.urlopen(url)

    scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
    filename = os.path.basename(path)
    if not filename:
        filename = 'downloaded.file'
    if dest:
        filename = os.path.join(dest, filename)

    with open(filename, 'wb') as f:
        meta = u.info()
        meta_func = meta.getheaders if hasattr(meta, 'getheaders') else meta.get_all
        meta_length = meta_func("Content-Length")
        file_size = None
        if meta_length:
            file_size = int(meta_length[0])
        print("Downloading: {0} Bytes: {1}".format(url, file_size))

        file_size_dl = 0
        block_sz = 8192
        while True:
            buffer = u.read(block_sz)
            if not buffer:
                break

            file_size_dl += len(buffer)
            f.write(buffer)

            status = "{0:16}".format(file_size_dl)
            if file_size:
                status += "   [{0:6.2f}%]".format(file_size_dl * 100 / file_size)
            status += chr(13)
            print(status, end="")
        print()

    return filename

if __name__ == "__main__":  # Only run if this file is called directly
    print("Testing with 10MB download")
    url = "http://download.thinkbroadband.com/10MB.zip"
    filename = download_file(url)
    print(filename)