threading.local类

    要实现将资源和持有资源的线程进行绑定的操作,最简单的做法就是使用模块的local类,在网络爬虫开发中,就可以使用local类为每个线程绑定一个MySQL数据库连接或Redis客户端对象,这样通过线程可以直接获得这些资源,既解决了资源竞争的问题,又避免了在函数和方法调用时传递这些资源。具体的请参考本章多线程爬取“手机搜狐网”(Redis版)的实例代码。

    concurrent.futures模块

    Python3.2带来了concurrent.futures 模块,这个模块包含了线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。关于这部分的内容推荐大家阅读。

    分布式进程

    协程的概念

    协程(coroutine)通常又称之为微线程或纤程,它是相互协作的一组子程序(函数)。所谓相互协作指的是在执行函数A时,可以随时中断去执行函数B,然后又中断继续执行函数A。注意,这一过程并不是函数调用(因为没有调用语句),整个过程看似像多线程,然而协程只有一个线程执行。协程通过yield关键字和 send()操作来转移执行权,协程之间不是调用者与被调用者的关系。

    协程的优势在于以下两点:

    1. 执行效率极高,因为子程序(函数)切换不是线程切换,由程序自身控制,没有切换线程的开销。
    2. 不需要多线程的锁机制,因为只有一个线程,也不存在竞争资源的问题,当然也就不需要对资源加锁保护,因此执行效率高很多。

    历史回顾

    1. Python 2.2:第一次提出了生成器(最初称之为迭代器)的概念(PEP 255)。
    2. Python 2.5:引入了将对象发送回暂停了的生成器这一特性即生成器的send()方法(PEP 342)。
    3. Python 3.3:添加了yield from特性,允许从迭代器中返回任何值(注意生成器本身也是迭代器),这样我们就可以串联生成器并且重构出更好的生成器。
    4. Python 3.4:引入asyncio.coroutine装饰器用来标记作为协程的函数,协程函数和asyncio及其事件循环一起使用,来实现异步I/O操作。
    5. Python 3.5:引入了asyncawait,可以使用async def来定义一个协程函数,这个函数中不能包含任何形式的yield语句,但是可以使用returnawait从协程中返回值。

    协程实现了协作式并发,通过提高CPU的利用率来达到改善性能的目的。著名的三方库就是通过协程的方式实现了HTTP客户端和HTTP服务器的功能,较之requests有更好的获取数据的性能,有兴趣可以阅读它的官方文档

    下面我们把之间讲的所有知识结合起来,用面向对象的方式实现一个爬取“手机搜狐网”的多线程爬虫。

    1. import pickle
    2. import zlib
    3. from enum import Enum, unique
    4. from hashlib import sha1
    5. from random import random
    6. from threading import Thread, current_thread, local
    7. from time import sleep
    8. from urllib.parse import urlparse
    9. import pymongo
    10. import redis
    11. import requests
    12. from bs4 import BeautifulSoup
    13. from bson import Binary
    14. @unique
    15. class SpiderStatus(Enum):
    16. IDLE = 0
    17. WORKING = 1
    18. def decode_page(page_bytes, charsets=('utf-8',)):
    19. page_html = None
    20. for charset in charsets:
    21. try:
    22. page_html = page_bytes.decode(charset)
    23. break
    24. except UnicodeDecodeError:
    25. pass
    26. return page_html
    27. class Retry(object):
    28. def __init__(self, *, retry_times=3,
    29. wait_secs=5, errors=(Exception, )):
    30. self.retry_times = retry_times
    31. self.errors = errors
    32. def wrapper(*args, **kwargs):
    33. for _ in range(self.retry_times):
    34. try:
    35. return fn(*args, **kwargs)
    36. except self.errors as e:
    37. print(e)
    38. sleep((random() + 1) * self.wait_secs)
    39. return None
    40. return wrapper
    41. class Spider(object):
    42. def __init__(self):
    43. self.status = SpiderStatus.IDLE
    44. @Retry()
    45. def fetch(self, current_url, *, charsets=('utf-8', ),
    46. user_agent=None, proxies=None):
    47. thread_name = current_thread().name
    48. print(f'[{thread_name}]: {current_url}')
    49. headers = {'user-agent': user_agent} if user_agent else {}
    50. resp = requests.get(current_url,
    51. headers=headers, proxies=proxies)
    52. return decode_page(resp.content, charsets) \
    53. if resp.status_code == 200 else None
    54. def parse(self, html_page, *, domain='m.sohu.com'):
    55. soup = BeautifulSoup(html_page, 'lxml')
    56. for a_tag in soup.body.select('a[href]'):
    57. parser = urlparse(a_tag.attrs['href'])
    58. scheme = parser.scheme or 'http'
    59. netloc = parser.netloc or domain
    60. if scheme != 'javascript' and netloc == domain:
    61. path = parser.path
    62. query = '?' + parser.query if parser.query else ''
    63. full_url = f'{scheme}://{netloc}{path}{query}'
    64. redis_client = thread_local.redis_client
    65. if not redis_client.sismember('visited_urls', full_url):
    66. redis_client.rpush('m_sohu_task', full_url)
    67. def extract(self, html_page):
    68. pass
    69. def store(self, data_dict):
    70. # redis_client = thread_local.redis_client
    71. # mongo_db = thread_local.mongo_db
    72. pass
    73. class SpiderThread(Thread):
    74. def __init__(self, name, spider):
    75. def run(self):
    76. redis_client = redis.Redis(host='1.2.3.4', port=6379, password='1qaz2wsx')
    77. mongo_client = pymongo.MongoClient(host='1.2.3.4', port=27017)
    78. thread_local.redis_client = redis_client
    79. thread_local.mongo_db = mongo_client.msohu
    80. while True:
    81. current_url = redis_client.lpop('m_sohu_task')
    82. while not current_url:
    83. current_url = redis_client.lpop('m_sohu_task')
    84. self.spider.status = SpiderStatus.WORKING
    85. current_url = current_url.decode('utf-8')
    86. if not redis_client.sismember('visited_urls', current_url):
    87. redis_client.sadd('visited_urls', current_url)
    88. html_page = self.spider.fetch(current_url)
    89. if html_page not in [None, '']:
    90. hasher = hasher_proto.copy()
    91. hasher.update(current_url.encode('utf-8'))
    92. doc_id = hasher.hexdigest()
    93. sohu_data_coll = mongo_client.msohu.webpages
    94. if not sohu_data_coll.find_one({'_id': doc_id}):
    95. sohu_data_coll.insert_one({
    96. '_id': doc_id,
    97. 'url': current_url,
    98. 'page': Binary(zlib.compress(pickle.dumps(html_page)))
    99. })
    100. self.spider.parse(html_page)
    101. self.spider.status = SpiderStatus.IDLE
    102. def is_any_alive(spider_threads):
    103. return any([spider_thread.spider.status == SpiderStatus.WORKING
    104. for spider_thread in spider_threads])
    105. thread_local = local()
    106. hasher_proto = sha1()
    107. def main():
    108. redis_client = redis.Redis(host='1.2.3.4', port=6379, password='1qaz2wsx')
    109. if not redis_client.exists('m_sohu_task'):
    110. redis_client.rpush('m_sohu_task', 'http://m.sohu.com/')
    111. spider_threads = [SpiderThread('thread-%d' % i, Spider())
    112. for i in range(10)]
    113. for spider_thread in spider_threads:
    114. spider_thread.start()
    115. while redis_client.exists('m_sohu_task') or is_any_alive(spider_threads):
    116. sleep(5)
    117. print('Over!')
    118. if __name__ == '__main__':