数据采集与融合技术第三次作业

发布时间 2023-10-30 09:00:40作者: 不_会写代码

作业一

实验内容

实现

  • 单线程方式:
    items.py:明确爬取目标 , 本题只需要一个图片的下载链接src,设置src=scrapy.Field()
import scrapy
class ScrapyAmazonItem(scrapy.Item):
    # define the fields for your item here like:
    src=scrapy.Field()
    pass

amazonspider.py:编写爬虫,用xpath'//img/@src'遍历所有图片

import scrapy
from scrapy_amazon.items import ScrapyAmazonItem

class AmazonSpider(scrapy.Spider):
    name="amazonspider"
    #url
    allowed_domains=['origin-www.amazon.cn']

    start_urls=['https://origin-www.amazon.cn/s?k=书包&page=1']
    base_url='https://origin-www.amazon.cn/s?k=书包&page='

  #起始页码
    page=1
    #先执行start_url之后再执行的方法
    def parse(self,response):#通过response参数获取到对应的响应对象
        #获取所有的图片,即获取所有img标签下的src
        lists=response.xpath('//img/@src')
        for li in lists:
            src=li.extract()
            book = ScrapyAmazonItem(src=src)
            yield book
        #翻页
        if self.page < 3:#102102110,尾号10,爬10页感觉太多了,爬二进制的10吧(doge)
            self.page = self.page+1
            url=self.base_url+str(self.page)

            yield scrapy.Request(url=url,callback=self.parse)#生成新的请求对象

pipelines.py:下载图片,并用回调函数open_spider()close_spider()计算程序运行时间(回调函数是指在爬虫运行过程中,由Scrapy自动调用的一些函数。这些函数可以在不同的时刻被调用)

from itemadapter import ItemAdapter

import os
import time
import urllib.request

class AmazonDownloadPiepline:
    def __init__(self):
        self.counter=1#这样就不会每次把counter重新初始化啦

    def open_spider(self, spider):#计算爬虫运行时间
        self.start_time = time.time()

    def close_spider(self, spider):
        elapsed_time = time.time() - self.start_time
        print(f"程序运行时间: {elapsed_time}秒")#在控制台输出单线程运行时间

    def process_item(self,item,spider):
        #获取url
        url=item.get('src')
        #创建下载文件夹
        if not os.path.exists('./pictures/'):
            os.mkdir('./pictures/')
        #在控制台输出url
        print('url'+str(self.counter)+"="+url)
        filename = f'./pictures/{self.counter}'+str(url)[-4:]#url中后四位为后缀,保留后缀,刚好图片都没有jpeg类的,不用考虑五位的情况
        urllib.request.urlretrieve(url=url, filename=filename)
        self.counter +=1
        return item

settings.py:在settings中开启管道

BOT_NAME = "scrapy_amazon"

SPIDER_MODULES = ["scrapy_amazon.spiders"]
NEWSPIDER_MODULE = "scrapy_amazon.spiders"

ITEM_PIPELINES = {
    # 管道可以有很多个,优先级的范围是1到1000 值越小优先级越高
   'scrapy_amazon.pipelines.AmazonDownloadPiepline': 300,
}
  • 运行结果:
    控制台输出url(部分):

picture文件夹内容:因为是下载所有图片,所以把一些奇怪的图标都下下来了
Image

还有这个奇怪的gif。。。

单线程运行时间如下:

  • 多线程方式:
    修改pipelines.py:增加线程池,实现下载过程的多线程
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
from itemadapter import ItemAdapter

import os
import time
import urllib.request
import threading
from queue import Queue
from scrapy.exceptions import DropItem

class AmazonDownloadPiepline:
    def __init__(self):
        self.counter=1
        self.queue = Queue()
        self.pool = []
        self.lock = threading.Lock()

        # 创建线程池
        for _ in range(5):
            thread = threading.Thread(target=self.process_item)
            self.pool.append(thread)
            thread.start()

    def open_spider(self, spider):#计算爬虫运行时间
        self.start_time = time.time()

    def close_spider(self, spider):
        self.queue.join()
        # 等待线程池中的线程结束
        for thread in self.pool:
            thread.join()
        elapsed_time = time.time() - self.start_time
        print(f"程序运行时间: {elapsed_time}秒")

    def process_item(self,item,spider):
        #获取url
        url=item.get('src')
        #创建下载文件夹
        if not os.path.exists('./pictures/'):
            os.mkdir('./pictures/')
        #在控制台输出url
        print('url'+str(self.counter)+"="+url)
        filename = f'./pictures/{self.counter}'+str(url)[-4:]#url中后四位为后缀,保留后缀,刚好图片都没有jpeg类的,不用考虑五位的情况
        urllib.request.urlretrieve(url=url, filename=filename)
        self.counter +=1
        return item

    def process_item_threaded(self, item, spider):
        # 将item添加到队列中
        self.queue.put(item)

  • 多线程运行时间:明显缩短!!

心得

  • 一开始以为修改settings.py中的CONCURRENT_REQUESTS = 32就可以,但是搜了下,Scrapy框架中,并发请求是通过异步的方式实现的,将CONCURRENT_REQUESTS的值设置为32,表示Scrapy框架可以同时处理32个请求。scrapy框架通过事件循环和回调函数的方式,在单线程中高效地处理多个请求和响应,而不是通过多线程增加速度。所以还是要在代码中增加线程。

作业二

实验内容

实现

一开始打算跟上题一样使用xpath爬取的,但是老师说这个是动态页面,xpath爬不下来,所以还是要用json。抓包过程和上次实验一样,找到jQuery,剩下步骤就是修改一下spider.py和其他scrapy框架的东西。
items.py:将一整行的信息作为列表直接yield,列表里的内容都是键值对,pipelines里传到数据库里的时候再用key提取出每一列的信息,保存到数据库,所以item.py里只设置一个序号id和一个str

import scrapy
class StocksItem(scrapy.Item):
    id=scrapy.Field()
    str=scrapy.Field()#
    pass

stocksspider.py

import scrapy
import re
from stocks.items import StocksItem

class StockSpider(scrapy.Spider):
    name="stockspider"

    allowed_domains=['84.push2.eastmoney.com']
    start_urls=["https://84.push2.eastmoney.com/api/qt/clist/get?cb=jQuery112407256076698414418_1697703074753&pn=1&pz=20&po=1&np=1&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&invt=2&wbp2u=|0|0|0|web&fid=f3&fs=m:0+t:6,m:0+t:80,m:1+t:2,m:1+t:23,m:0+t:81+s:2048&fields=f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f12,f13,f14,f15,f16,f17,f18,f20,f21,f23,f24,f25,f22,f11,f62,f128,f136,f115,f152&_=1697703074754"]
    #翻页pn=?
    base_url="https://84.push2.eastmoney.com/api/qt/clist/get?cb=jQuery112407256076698414418_1697703074753&pn="
    end_url= "&pz=20&po=1&np=1&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&invt=2&wbp2u=|0|0|0|web&fid=f3&fs=m:0+t:6,m:0+t:80,m:1+t:2,m:1+t:23,m:0+t:81+s:2048&fields=f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f12,f13,f14,f15,f16,f17,f18,f20,f21,f23,f24,f25,f22,f11,f62,f128,f136,f115,f152&_=1697703074754"
    page=1
    id=0#序号
    def parse(self,response):
        response=response.text
        pat='diff":\[(.*?)\]'#用正则表达式
        res=re.compile(pat,re.S).findall(response)
        datas = list(eval(res[0]))  # data[0]是str类型,要转成一个列表
        for d in datas:#按行
            self.id += 1#json串里没有找到关于序号的,自己设置
            item= StocksItem(id=self.id,str=d)
            yield item

        #翻页
        if self.page < 3:#同样翻3页
            self.page += 1
            url = self.base_url + str(self.page)+self.end_url
            print(url)

            yield scrapy.Request(url=url, callback=self.parse)  # 生成新的请求对象

pipelines.py:在pipelines.py里创建数据库,并保存到数据库。一定要记得加self.con.commit() 提交事务并关闭数据库连接。

import sqlite3

class StocksPipeline:
    def __init__(self):#只会在初始化对象的时候运行一次。
        self.con = sqlite3.connect("stocks_scrapy.db")
        self.cursor = self.con.cursor()
        try:#创建表
            self.cursor.execute(
                "create table stocks_scrapy (ID int, stock_code varchar,stock_name varchar,latest_price varchar,change_percent varchar,change_amount varchar,volume varchar ,turnover varchar,amplitude varchar,highest varchar,lowest varchar,open_price varchar,close_price varchar)")
        except:
            self.cursor.execute("delete from stocks_scrapy")

    def process_item(self,item,spider):
        try:
            d=item.get('str')
            id=item.get('id')
            self.con = sqlite3.connect("stocks_scrapy.db")#连接数据库!!
            self.cursor = self.con.cursor()
            self.cursor.execute(
                "insert into stocks_scrapy (ID,stock_code,stock_name,latest_price,change_percent,change_amount,volume,turnover,amplitude,highest,lowest,open_price,close_price) values (?,?,?,?,?,?,?,?,?,?,?,?,?)",
                (id, d['f12'], d['f14'], d['f2'], d['f3'], d['f4'], d['f5'],d['f6'],d['f7'], d['f15'],
                                     d['f16'], d['f17'], d['f18']))
            self.con.commit()#使用 self.con.commit() 提交事务并关闭数据库连接。
        except Exception as err:
            print(err)
        return item

settings.py:在settings中开启管道

BOT_NAME = "stocks"

SPIDER_MODULES = ["stocks.spiders"]
NEWSPIDER_MODULE = "stocks.spiders"

ITEM_PIPELINES = {
    # 管道可以有很多个且是有优先级 优先级的范围是1到1000 值越小优先级越高
   'stocks.pipelines.StocksPipeline': 300
}
  • 运行结果:
    在Navicat中查看表的内容

心得

  • 爬取的过程还是简单一点的,修改一下上次作业的就行,但是传到数据库的过程调了非常久,一开始是尝试在spider里创建数据库,插入数据,但是没成功。后面将所有存到数据库的工作都放在pipelines里,运行起来了,没有报错,但是数据库一直是空的。最后发现是忘记连接数据库,还忘记写self.con.commit() 提交事务并关闭数据库连接,就差这一句!!!!!!!!!改了好久。还是对scrapy框架的原理不够了解,运行过程摸不清。

作业三

实验内容

实现

items.py:明确要爬取的东西

import scrapy
class CurrencyItem(scrapy.Item):
    Currency=scrapy.Field()#货币名称
    TBP=scrapy.Field()#现汇买入价
    CBP=scrapy.Field()#现钞买入价
    TSP=scrapy.Field()#现汇卖出价
    CSP=scrapy.Field()#现钞卖出价
    Time=scrapy.Field()#发布时间
    pass

首先观察网页结果,很明显是表格形式。

Image

直接爬取每个tbody下的tr标签,使用xpath helper会发现第一个tr标签下不是我想要的内容。
Image

于是从第二个开始获取tr标签下的每个td标签,在xpath helper中是成功获取了想要的信息的。
Image

此时愉快的开始爬取,爬出来的是空的,逐步检查,排除了一些小错误后还是空的。以为是没有加headers,于是在middlewares.py中增加了User-Agent的信息。
Image

还是空的。。。改变思路直接输出trs=response.xpath('//tbody')还是空的,再输出trs=response.xpath('//html')中的内容又可以看见我想要的,所以排除是动态加载的页面。突发奇想直接爬取所有的tr标签,就是会多一些奇怪的东西,限制一下遍历trs的范围,以及发现个别行的内容存在空的情况,加一个判断将控制设为none,最终的spider代码如下。
currencypider.py

'''
爬取外汇网站数据
http://www.boc.cn/sourcedb/whpj/
Currency TBP CBP TSP CSP Time
'''
import scrapy
from currency.items import CurrencyItem

class CurrencySpider(scrapy.Spider):
    name="CurrencySpider"
    allowed_domains=['www.boc.cn']
    start_urls=['http://www.boc.cn/sourcedb/whpj/']
    base_url='https://www.boc.cn/sourcedb/whpj/index_'
    end_url='.html'
    page=0
    def parse(self,response):
        trs=response.xpath('//tr')#所有的tr标签
        for tr in trs[2:29]:#遍历tr标签下的每个td,tr从第二个开始才是要的信息
            Currency=tr.xpath('./td[1]/text()').extract_first()
            if tr.xpath('./td[2]/text()').extract_first() is not None:
                TBP=tr.xpath('./td[2]/text()').extract_first()
            else:TBP='none'

            if tr.xpath('./td[3]/text()').extract_first() is not None:
                CBP=tr.xpath('./td[3]/text()').extract_first()
            else:CBP='none'

            if tr.xpath('./td[4]/text()').extract_first() is not None:
                TSP=tr.xpath('./td[4]/text()').extract_first()
            else:TSP='none'
            if tr.xpath('./td[5]/text()').extract_first() is not None:
                CSP=tr.xpath('./td[5]/text()').extract_first()
            else:CSP='none'
            Time=tr.xpath('./td[7]/text()').extract_first()
            book=CurrencyItem(Currency=Currency,TBP=TBP,CBP=CBP,TSP=TSP,CSP=CSP,Time=Time)
            yield book

        if self.page < 2:#翻三页,该网页从0开始计算页数
            self.page+=1
            url=self.base_url+str(self.page)+self.end_url
            yield scrapy.Request(url=url,callback=self.parse)

pipelines.py:将数据存储到数据库中,跟上一题大差不差,就是又忘记写self.con.commit()

from itemadapter import ItemAdapter

import sqlite3
class CurrencyPipeline:
    def __init__(self):#只会在初始化对象的时候运行一次,创建表
        self.con = sqlite3.connect("Currency.db")
        self.cursor = self.con.cursor()
        try:
            self.cursor.execute(
                "create table Currency (Currency varchar,TBP varchar,CBP varchar, TSP varchar,CSP varchar,Time varchar)")
        except:
            self.cursor.execute("delete from Currency")
    def process_item(self, item, spider):
        try:
            Currency=item.get('Currency')
            TBP=item.get('TBP')
            CBP=item.get('CBP')
            TSP=item.get('TSP')
            CSP=item.get('CSP')
            Time=item.get('Time')
            #连接数据库
            self.con = sqlite3.connect("Currency.db")
            self.cursor = self.con.cursor()
            #插入数据
            self.cursor.execute(
                "insert into Currency (Currency,TBP,CBP,TSP,CSP,Time) values(?,?,?,?,?,?)",(
str(Currency),str(TBP),str(CBP),str(TSP),str(CSP),str(Time)))
            self.con.commit()

        except Exception as err:
            print(err)
        return item

settings.py

BOT_NAME = "currency"

SPIDER_MODULES = ["currency.spiders"]
NEWSPIDER_MODULE = "currency.spiders"

ITEM_PIPELINES = {
    "currency.pipelines.CurrencyPipeline": 300,
}
  • 运行结果((部分):

心得

  • 一样的错犯了两次,下次一定会记得写self.con.commit()。。。。
  • extract()不管有几个都会返回一个列表,所以输出来很丑,还不能直接存进数据库,用extract_first()返回的就是一个字符串了。