-
Notifications
You must be signed in to change notification settings - Fork 15
Managing Hubstorage Crawl Frontiers
pip install shub-workflow
pip install hcf-backend
pip install scrapy-frontera
For big crawls, a single spider job is usually problematic. If the spider is stopped for any reason, you have to recrawl from zero, thus loosing time and costs in resources. If you are reading this documentation, I don't have to explain all the pain that stops mean with huge crawls when you run a single job crawl: you are here most probably because you are searching for a better approach for your project.
A crawl frontier is a store of requests that can be filled and consumed progressively by different processes. ScrapyCloud provides a native crawl frontier, Hubstorage Crawl Frontier (HCF). And we provide an HCF backend for Frontera, which in turn is a framework for development of crawl frontier handlers (backends), by stablishing both a common protocol and a common interface, plus provision of some extra facilities. For scrapy spiders, the equation is completed with scrapy-frontera, which provides a scrapy scheduler that supports Frontera
without loosing any native behavior from the original scrapy scheduler, so migration of a spider to Frontera
becomes painless.
When working with spiders we can identify two differentiated roles: a producer role and a consumer role, depending on whether the spider writes requests to a requests queue, or read them from it in order to actually process them. Usual scrapy spiders assume both roles at same time transparently: same spider writes requests to a local memory/disk queue, and later reads them from it. We don't usually think in a scrapy spider as a producer/consumer because they work natively in both ways at same time. But that is the logic behind the native scrapy scheduler, which takes care of:
- Queuing requests coming from spider, by default into a dual memory/disk queue
- Dequeuing requests into the downloader, under specific prioritization rules.
On the other hand, the scheduler implemented in the scrapy-frontera
package, which is a subclass of the scrapy scheduler, provides interface with Frontera
and allows, either from spider code or configured by project settings, to send specific requests to Frontera
backend. In this way, the Frontera
backend assumes the responsibility of queuing, dequeuing and prioritization of requests sent to/read from it, while the remaining requests follow the usual flow within Scrapy backend. Still, the scheduler asks for requests from the Frontera
backend, adapts them and puts them into their local queues. Important to notice here is that the scheduler asks for requests from the backend only if its own local queues are empty. So, requests in local queues has always priority over those in Frontera
backend queues.
So, when using an external frontier, like HCF, we can separate producer and consumer roles into different spiders, and so this division of roles becomes more evident. But whether we separate roles or not, depends on our specific implementation. It is possible to easily adapt a scrapy spider in order to work with HCF with minimal effort, both as producer and consumer. However, separation of roles into separated spiders (a producer spider and a consumer spider) is almost always much more convenient. That leads to more maintainable and easier to handle project architecture, as we will see.
Each ScrapyCloud project has its own HCF space separated from the rest. Within a project, HCF can be subdivided via frontier names, which in turn can be subdivided into slots via slots names. Frontier names and slot names are created on the fly by the producer agents. hcf-backend
comes with a handy tool for managing (deleting, listing, dumping, counting, etc) HCF objects: hcfpal.py:
PROJECT_ID=<project id> python -m hcf_backend.utils.hcfpal
If you need to make it available on a project deployed on ScrapyCloud you need, as usual, to define your own script, i.e, scripts/hcfpal.py
:
from hcf_backend.utils.hcfpal import HCFPalScript
if __name__ == '__main__':
script = HCFPalScript()
script.run()
Follow command line help for instructions of usage.
The first step is to add scrapy-frontera and hcf-backend in your project requirements. Then, configure your project to work with scrapy-frontera:
From the scrapy-frontera README, this is the basic configuration you need to apply in order to replace the native Scrapy scheduler by the scrapy-frontera
one:
# shut up frontera DEBUG flooding
import logging
logging.getLogger("manager.components").setLevel(logging.INFO)
logging.getLogger("manager").setLevel(logging.INFO)
SCHEDULER = 'scrapy_frontera.scheduler.FronteraScheduler'
BACKEND = 'hcf_backend.HCFBackend'
DOWNLOADER_MIDDLEWARES = {
'scrapy_frontera.middlewares.SchedulerDownloaderMiddleware': 0,
}
SPIDER_MIDDLEWARES = {
'scrapy_frontera.middlewares.SchedulerSpiderMiddleware': 0,
}
These changes alone will not affect the behavior of a scrapy project, but it will allow to implement spiders that works with HCF. Let's suppose we have the following spider:
from scrapy import Spider, Request
from scrapy.utils.request import request_fingerprint
from scrapy.utils.httpobj import urlparse_cached
class MyBroadCrawlSpider(Spider):
name = 'mybroadcrawlspider'
custom_settings = {
'CONCURRENT_REQUESTS_PER_IP': 2,
}
def start_requests(self):
if hasattr(self, 'start_url'):
yield Request(url=self.start_url)
def parse(self, response):
urls = {}
for link in response.xpath('//a/@href').extract():
url = response.urljoin(link)
request = Request(url)
if urlparse_cached(request).scheme in ('http', 'https'):
fp = request_fingerprint(request)
if fp not in urls:
urls[fp] = url
yield request
yield {
"url": response.url,
"title": (response.xpath('//title/text()').extract() + [''])[0].strip(),
"urls": list(urls.values()),
}
This is a broad crawl spider that admits a starting url via spider argument (start_url
), so in order to run it:
scrapy crawl mybroadcrawlspider -a start_url=<a starting url> -o items.jl
Ideally this spider would run forever while crawling the entire web. But for sure it will stop when operating system terminates it after there is not enough memory or disk available for storing all the generated requests and items, unless you set some CLOSESPIDER_*
limit or stop it manually. So, we have a usual scrapy spider here. Nothing new. A controlled approach can be implemented using frontier requests. For this, we only require to add some extra settings to the spider code:
class MyBroadCrawlSpider(Spider):
name = 'mybroadcrawlspider'
frontera_settings = {
'HCF_PRODUCER_FRONTIER': name,
'HCF_CONSUMER_FRONTIER': name,
}
custom_settings = {
'CONCURRENT_REQUESTS_PER_IP': 2,
'FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER': ['parse'],
}
(...)
Or even pass the additional settings via command line, without any change in code, either via SCRAPY_
prefixed environment variables as usual or, in case of frontera settings, via the spider argument frontera_settings_json
(see scrapy-frontera README)
In the frontera settings above you could also include the HCF_PROJECT_ID
setting in order to explicitly indicate the ScrapyCloud project space of the HCF we will work on, as the scrapy-frontera
README indicates. However, unless you really need to force the project, it is better to leave for the backend to autodetect it. In this way, the project space used will be the project where the spider is running, in case it runs on ScrapyCloud. Or, if you want to run a test in your local environment, you can set the PROJECT_ID
environment variable in order to indicate the project.
Either you run the spider on ScrapyCloud or locally, you need to set the ScrapyCloud apikey for access to the HCF. One way is via the setting HCF_AUTH
, but it is often more convenient to set the environment variable SH_APIKEY
, because this is the standard way that scrapinghub
python bindings library works. Deployment of environment variables into a ScrapyCloud project can be done via the project image Dockerfile.
The hcf setting HCF_PRODUCER_FRONTIER
configures the spider as a producer. The hcf setting HCF_CONSUMER_FRONTIER
configures it as a consumer. The specified frontier names can be anything, but it is a good practice to use the spider name, as in this example. And of course, if your spider will consume requests from the same frontier it writes to, both settings need to be set to the same value.
The setting FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER
establishes that every request which its callback
name is on the provided list (except for the requests generated by start_requests()
), will be sent to the frontier.
Notice that, while hcf settings are added into frontera_settings
dict, the last one is added into custom_settings
dict. This is because the hcf backend resides on the frontera side, while FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER
is a scrapy scheduler setting, and so it is read on scrapy
side. Optionally, you can place everything under custom_settings
only, but separation of scopes is a good practice.
Running the spider configured in this way is the same as before, except that you need to set environment
variables PROJECT_ID
and SH_APIKEY
, as mentioned above:
export SH_APIKEY=<apikey>
PROJECT_ID=<project id> scrapy crawl mybroadcrawlspider -a start_url=<a starting url> -o items.jl
This crawl, as in the previous version, will ideally run "forever". However, it has many advantages with respect to the non crawl frontier version:
- It uses much less memory and disk, because now the requests are sent to the frontier, so risks of stopping because of exhausted memory or disk space due to the amount of requests generated, disappear.
- If for any reason it stops, you can resume it starting from the point it stopped. Try the following: cancel the crawl process with Ctrl-C, and then resume with:
PROJECT_ID=<project id> scrapy crawl mybroadcrawlspider -o items.jl
You don't even need a starting url.
An essential feature of HCF is request deduplication: any request sent to the frontier that has the same fingerprint than a previous request already sent, will be filtered out. scrapy-frontera
ensures that the fingerprint scheme used is the same that scrapy uses internally, so deduplication in HCF will work exactly in the same way as you expect. Even more, you can use the dont_filter=True
flag in your requests, and the request will not be deduplicated. scrapy-frontera
adaptation also takes care of that.
If you want to split the crawl into small jobs that stop automatically (and that you can resume later), you can either use one of the CLOSESPIDER_*
settings, or limit the job length via any of the hcf settings HCF_CONSUMER_MAX_BATCHES
or HCF_CONSUMER_MAX_REQUESTS
(see hcf-backend code docstring). For example:
class MyBroadCrawlSpider(Spider):
name = 'mybroadcrawlspider'
frontera_settings = {
'HCF_PRODUCER_FRONTIER': name,
'HCF_CONSUMER_FRONTIER': name,
'HCF_CONSUMER_MAX_BATCHES': 10,
}
(...)
This way, a single spider job will consume at most 10 HCF batches. Each HCF batch contains up to 100 requests, and once all them are processed, the job will stop.
And remember that you can inspect the content of the frontier at any time with the hcfpal
tool.
In a previous section we saw how we can automatize the scheduling of a spider via the shub-workflow
crawl manager. We can use it for rescheduling the spider indefinitely, thus making the global crawl stable along time:
PROJECT_ID=<project id> crawlmanager.py mybroadcrawlspider --loop-mode=120 \
--spider-args '{"start_url": "<a start url>"}'
Jobs will be scheduled on the target ScrapyCloud project. If they finish prematurely with this error message:
builtins.RuntimeError: No API key provided and SH_APIKEY environment variable not set
it means that you forgot to set the SH_APIKEY
environment variable in your project Dockerfile.
So far, this approach is enough in many situations. But when you need a high throughput crawl, you will find some limitations, like network I/O bandwidth, CPU bandwidth, or target site session bandwidth. Network bandwidth limitation can be partly or mostly surpassed, depending on the case, with a proxy service like Crawlera. But still part of the network bandwidth limitation may reside in the limited resources of the ScrapyCloud container where your crawler is running. On the other hand, in the case of sites that need to be crawled from multiple sessions, it is possible to implement your spider for handling several sessions in a single job. However, not only you may easily find the resources limits as the crawl speed increases, but also the logic of the spider will increase in complexity, errors and difficulty to track and maintain. So, either you have container resources limitations (CPU being the most common one), or the target site has to be crawled into multiple sessions, crawler parallelization is the standard approach. And frequently, that includes separation of links discovering from page processing, that is, separation of producer role from consumer role. As a plus, parallelization offers an elastic assignation of resources devoted to a crawl.
HCF allows parallelization by splitting a frontier into an arbitrary number of slots. Even more, in our example we already used one implicit frontier slot: the slot '0' (if you inspected the frontier with hcfpal during your tests, you already discovered that). Thus, by default, when you don't specify number of slots, hcf-backend
will create a single one named '0', and all requests will go there. It is important to understand that request deduplication works per slot. So if you add a request more than once in the same slot, HCF will filter out further instances after the first one. But that will not happen if you add same requests into different slots. scrapy-frontera
however, takes care of mapping a given request fingerprint always into the same slot. But this mapping will work correctly only under a fixed number of slots. If you change this number, mapping will vary. So for conserving consistency you shouldn't change this number unless you restart a crawl from zero, previously deleting all frontier slots. A good practice is then to create enough number of slots to cover a future increase in parallelization in the lifetime of a single crawl: the maximal number of parallel consumer jobs possible are limited by the number of slots a frontier is split on.
In the case of our broad crawl example, parallelization is not directly possible because by design (for high performance) HCF does not support multiple producers writing to the same slot at same time. In a first glance we could think that if each parallel job reads/write from/into a single slot, we will bypass the problem. However, that will imply to loose request deduplication between jobs. As a consequence, by doing that we will only achieve several independent crawlers, rather than a single parallelized crawl. A smarter approach is to split the crawl space in a way that each crawler will only write requests mapped to the slot it is consuming, but this will result in loosing links, the ones that are not referred by any page in the same crawling space. And this effect will worsen as more splits you have. It is possible to override the default request mapping that scrapy_frontera
implements, and for example, map requests to slot by domain instead of by request fingerprint. In this case, we may loose entire domains when they are not referred by another domain in the same crawl space. But in this case the limitation could be overcome by a separated process for discovering domains. Depending on your needs, there are a bunch of possible approaches. However, it is not the intention of this document to discuss about broad crawlers strategies. And if you don't need extremely high speed, you may even not need to parallelize the crawl at the cost of more complexity, loosing of requests, etc.
So let's leave aside our broad crawler example and focus on more mundane use cases. Despite the described limitation, in most use cases we not only can bypass this limitation, but also it imposes the adoption of better practices for organizing the crawler architecture. For example, in most uses cases, we can separate the crawling job into a single producer and multiple consumers. Or, we can have multiple producers, but each one intended to be processed by different spiders, and so each one writes to a different set of slots. For example, some links are sent to a spider that extracts products data while others are sent to a spider that extracts brands data. Either way, producer spiders can take care of the discovery of links (crawling only), while consumers takes care of processing pages (data extraction only). Very frequently the discovery of links can be performed over different pages than the ones from where we extract data, so in those cases this approach will not even imply to download same pages twice.
And if you can organize your crawler in those ways, it is much better than having a single spider that does everything. Separation of tasks reduces complexity and maintenance of the crawler. And allows a producer job to discover links much faster while leaving the heavy task of data extraction and storing to other ones. Additionally, as discovering of links is a light task while extraction is a heavy one, it means that not only the producers are less benefited by parallelization but also, there won't be benefit in generating links much faster than the consumers can read. Having a single producer for dozens of consumers, for example, is a typical situation and still consumers be the speed bottleneck.
A typical use case of this architecture is when you have a set of seed urls from where to start the discovery of articles links that need to be downloaded for extracting articles data, for example. We can have a single-job spider that starts crawling from the seeds, and sends the articles links it discover to a frontier with N slots (let's call this spider myproject-links
), and a parallelized consumer spider that reads these slots for article requests, downloads the pages, extracts and stores the articles data (let's call this spider myproject-articles
).
The producer can be configured in this way:
class MyProjectLinksSpider(Spider):
name = 'myproject-links'
frontera_settings = {
'HCF_PRODUCER_FRONTIER': 'myproject-articles',
'HCF_PRODUCER_SLOT_PREFIX': 'test',
'HCF_PRODUCER_NUMBER_OF_SLOTS': 8,
}
custom_settings = {
'FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER': ['parse_article'],
}
def parse_article(self, response):
pass
def parse(Self, response):
(... link discovery code ...)
Some new elements are introduced here. We have the hcf settings HCF_PRODUCER_SLOT_PREFIX
and HCF_PRODUCER_NUMBER_OF_SLOTS
. The default values for these settings are '' and 1 respectively. That is why, as we saw before, when those settings are not provided, the hcf backend will generate a single slot with name '0'
. The configuration in this new example will generate, instead, 8 slots, named from 'test0'
to 'test7'
.
An aspect you may find weird is the implementation of a method parse_article()
that does nothing. This is required because the requests that this spider will send to the frontier, need to reference a callback with the same name as the one in the myproject-articles
spider that will process the response. That is, you will create here the requests with Request(..., callback=self.parse_article,...)
, but the actual implementation of parse_article()
is not in this spider, but in myproject-articles
instead.
An alternative to the setting FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER
is to add to the requests that has to be sent to the frontier, the request meta key 'cf_store': True
. This requires, however, more intervention in the spider code and less separation of implementation from configuration. But it is up to the taste of the developer which approach to use. cf_store
is really conserved for backward compatibility with older versions of scrapy-frontera, and for eventual fine tuning requirements.
On the side of the consumer spider:
class MyProjectArticlesSpider(Spider):
name = 'myproject-articles'
frontera_settings = {
'HCF_CONSUMER_FRONTIER': 'myproject-articles',
'HCF_CONSUMER_MAX_BATCHES': 50,
}
def parse_article(self, response):
(... actual implementation of the article parsing method ...)
(...)
We need to indicate the slot that a consumer job must consume. This is set via the hcf setting HCF_CONSUMER_SLOT
. But we can't hard code this setting because different jobs of same spider will read from different slots. So this setting has to be passed via command line. However setting eight separated jobs each one with a different slot is not practical. A specialized hcf crawl manager will help here. Create the following script with the name scripts/hcf_crawlmanager.py
:
from hcf_backend.utils.crawlmanager import HCFCrawlManager
if __name__ == '__main__':
hcf_crawlmanager = HCFCrawlManager()
hcf_crawlmanager.run()
This crawl manager is a subclass of the described one above. Command line is the same except for two additional positional arguments, frontier
and prefix
:
# hcf_crawlmanager.py -h
usage:
Script for easy managing of consumer spiders from multiple slots. It checks available slots and schedules
a job for each one.
[-h] [--project-id PROJECT_ID] [--name NAME] [--flow-id FLOW_ID]
[--loop-mode SECONDS] [--max-running-jobs MAX_RUNNING_JOBS] [--tag TAG]
[--spider-args SPIDER_ARGS] [--job-settings JOB_SETTINGS]
[--units UNITS]
spider frontier prefix
positional arguments:
spider Spider name
frontier Frontier name
prefix Slot prefix
(...)
So, in order to run this crawler, you first schedule the producer, and after a couple of minutes (so to ensure the producer already started to write requests to the frontier), the hcf crawlmanager:
# export PROJECT_ID=<project_id>
# crawlmanager.py myproject-links
# sleep 120; hcf_crawlmanager.py myproject-articles myproject-articles test --loop-mode=120
The hcf_crawlmanager loop checks which slots are available, that is, which ones has requests pending to be processed, and for which there is not already a spider job consuming them. Then randomly schedules a new job for consuming any of the available ones, passing the hcf setting HCF_CONSUMER_SLOT
with the corresponding value. The HCF crawl manager continues looping until all the slots has been fully consumed and all consumer jobs has been completed.
Once your first tests with your crawler ran ok, you may want to override the slot prefix for using production slots:
# crawlmanager.py myproject-links --spider-args '{"frontera_settings_json": {"HCF_PRODUCER_SLOT_PREFIX": "production"}}'
# sleep 120; hcf_crawlmanager.py myproject-articles myproject-articles production --loop-mode=120
Notice that in the first step we are using the crawl manager only for easy scheduling of the links spider from console into ScrapyCloud. From ScrapyCloud, you can schedule the spider directly.
The last two step workflow is a good candidate for introducing the Graph Manager. This manager allows to define complex workflows of arbitrary number of chained and parallel tasks, impose chain conditions, wait times, etc. Let's add a graph manager script for defining this specific workflow, so we can deploy it on ScrapyCloud. The script name could be scripts/manager_articles.py
:
import json
from shub_workflow.graph import GraphManager
from shub_workflow.graph.task import Task, SpiderTask
class ArticlesManager(GraphManager):
name = 'articles'
loop_mode = 90
frontier = 'myproject-articles'
def add_argparser_options(self):
super().add_argparser_options()
self.argparser.add_argument('--slot-prefix', default='test')
def configure_workflow(self):
producer_frontera_settings = {
'HCF_PRODUCER_SLOT_PREFIX': self.args.slot_prefix,
'HCF_PRODUCER_FRONTIER': self.frontier
}
linksTask = SpiderTask('links', 'myproject-links',
frontera_settings_json=json.dumps(producer_frontera_settings))
articlesTask = Task('articles', 'hcf_crawlmanager.py', wait_time=120,
init_args=['myproject-articles', self.frontier,
self.args.slot_prefix, '--loop-mode=120'])
return linksTask, articlesTask
Then, you invoke this manager as:
# manager_articles.py --starting-job=links --starting-job=articles --slot-prefix=production
This command line execution reproduces almost exactly the same workflow defined at the end of previous section: the scheduling of the myproject-links
spider, then a wait time of 120 seconds, and finally the scheduling of the hcf_crawlmanager.py for controlling the myproject-articles
spider consumer jobs. There are however three differences:
- all the scheduled jobs, from the graph manager to the spiders scheduled by the hcf crawl manager, will have exactly the same
FLOW_ID
. Having the sameFLOW_ID
for all jobs of the same workflow instance will allow easier tracking among many other jobs in same ScrapyCloud project. This property is relevant not only for monitoring and following the workflow along many jobs. It will also provide other advantages that we will see later. - The producer spider will not be scheduled by a crawl manager, but directly by the graph manager, through the wrapper
SpiderTask
(more on this below). - The hcf manager job will not wait exactly 120 seconds, as we will see.
Let's discuss how the workflow is defined. The method configure_workflow()
is where you declare the tasks and their workflow. It must return a tuple containing the root jobs of the workflow tree, which are most frequently, but not necessarily, the same as the actual starting jobs called in the command line. While root jobs are part of the topological declaration of the workflow, starting jobs are actually the jobs from which you want to start the execution in a particular workflow instance, which may not even be the root ones at all. The producer spider job is represented by the line
linksTask = SpiderTask('links', 'myproject-links',
frontera_settings_json=json.dumps(producer_frontera_settings))
The SpiderTask
class wraps a spider job. The instance here is initialized with a task name, the spider name, and spider job arguments, in this case, frontera_settings_json
. Notice that we are passing the HCF_PRODUCER_FRONTIER
and `` on frontera settings, so we can remove this setting from the producer spider code. This measure allows to delegate the control of the frontier to the graph manager. On the other hand, the hcf manager job that controls the consumer spider jobs is represented by the line
articlesTask = Task('articles', 'hcf_crawlmanager.py', wait_time=120,
init_args=['myproject-articles', self.frontier,
self.args.slot_prefix, '--loop-mode=120'])
The Task
class wraps a script job. We initialize it with a task name, the script name, a wait time of 120 seconds, and the script command line arguments. For detailed reference on class Task
and SpiderTask
constructor arguments, see task.py module.
The result of this topology instantiated with both producer and consumer tasks as starting jobs, is that both tasks will be added to the internal pending queue when workflow starts. That is, in the first workflow execution loop. But while the links job is scheduled immediately, the articles job will not because it has to wait a minimal of 120 seconds, the time specified here in wait_time
parameter. Because the loop_mode attribute of this graph manager was set to 90, the second execution loop will take place after 90 seconds completed the first loop. The pending queue is checked and because the wait time of 120 seconds was not still accomplished, the articles job will still wait for another loop. Only during the third loop, that is, another 90 seconds after the second loop, or 180 seconds since the time it was added to queue, the articles job will be scheduled on ScrapyCloud. Notice then, that while the wait time was set to 120, the actual wait time became around 180 seconds.
What if the producer spider needs long time to be completed and a failure in the middle stops it prematurely, thus loosing an important amount of links? If we want to avoid that risk, and we don't want to schedule the producer from zero, we can make the producer resumable with HCF. But we cannot write the links that are intended to be followed by our producer, on the same slots destined to our consumer, the articles spider. We need a way to map own requests to a different set of slots. And that feature is provided by the scrapy-frontera setting FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP
. Let's reformulate then our articles links producer:
class MyProjectLinksSpider(Spider):
name = 'myproject-links'
custom_settings = {
'FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER': ['parse_article', 'parse'],
}
def parse_article(self, response):
pass
def parse(Self, response):
(... link discovery code ...)
Notice the addition of 'parse' into the FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER
list. Also, all frontera settings were removed from spider code. They will be controlled by the graph manager, modified as follows:
import json
from shub_workflow.graph import GraphManager
from shub_workflow.graph.task import Task, SpiderTask
class ArticlesManager(GraphManager):
name = 'articles'
loop_mode = 90
frontier = 'myproject-articles'
def add_argparser_options(self):
super().add_argparser_options()
self.argparser.add_argument('--slot-prefix', default='test')
self.argparser.add_argument('--num-slots', default=8, type=int)
def configure_workflow(self):
producer_spider_args = {
"frontera_settings_json": {
"HCF_PRODUCER_NUMBER_OF_SLOTS": 1,
}
}
producer_settings = {
'FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP': {
'parse_article': f'{self.args.slot_prefix}/{self.args.num_slots}',
},
}
linksTask = Task('links', 'hcf_crawlmanager.py', init_args=[
myproject-links, self.frontier, 'links', '--loop-mode=120',
'--job-settings=%s' % json.dumps(producer_settings),
'--spider-args=%s' % json.dumps(producer_spider_args),
])
consumer_spider_args = {
"frontera_settings_json": {
'HCF_CONSUMER_FRONTIER': 'myproject-articles',
'HCF_CONSUMER_MAX_BATCHES': 50,
}
}
articlesTask = Task('articles', 'hcf_crawlmanager.py', wait_time=120,
init_args=['myproject-articles', self.frontier,
self.args.slot_prefix, '--loop-mode=120',
'--spider-args=%s' % json.dumps(consumer_spider_args)])
return linksTask, articlesTask
Some things changed with respect to the previous versions:
-
links
task is now an hcf crawlmanager task instead of a spider task. This hcf crawlmanager will control the links spider as a consumer of the requests generated to itself. That is the reason why the slot prefix argument value is'links'
, and we send, through spider argument, the frontera settingHCF_PRODUCER_NUMBER_OF_SLOTS
with value 1. - We pass
FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP
as job setting for the links spider, that instruct to which slots to send the requests destined to the articles spider. This is built from the command line options--slot-prefix
and a newly added one,--num-slots
. - All frontera settings for the articles spider are now controlled from the graph manager code and passed through the spider arguments as well. So, frontera settings can be removed from the articles spider code.
As a result:
- we have now a links spider that is a producer for the articles spider, and also as a producer/consumer for itself. The hcf crawlmanager will then take care of re scheduling in case it finishes by any reason, while there are still requests to be consumed from own links slots.
- we have as before the articles spider which is a consumer of articles links producer by the links spider.
- all frontera related settings, except for the
FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER
setting of the links spider are under the control of the graph manager. We could also move the control of that setting to the graph manager. However, we judged that, unlike the rest of settings, this one is coupled to the implementation of the spiders, so it is better to have it there.
When you have a large crawl split into many jobs, the delivery of data into a single file can become a pain. Shub-workflow provides a deliver script that facilitates that, by following all the jobs of the given spider (or given list of spiders), with same FLOW_ID as the delivery script. Let's suppose we want to deliver our job items into s3 (for the moment, the only protocol supported), so we would add to our project the script scripts/deliver.py
:
from shub_workflow.deliver import S3DeliverScript
class MyDeliverScript(S3DeliverScript):
s3_bucket_name = 'myproject_bucket'
if __name__ == '__main__':
deliver = MyDeliverScript()
deliver.run()
For fast deployment, this s3 delivery script provides some defaults for the file format and file name format. However most projects requires customization. The class S3DeliverScript is designed for easy overriding of methods in order to provide very flexible customization. Check the docstring and the code for more details.
So, how do we add a deliver job into our workflow? We need to run it once our articles job is finished, so we have to add the following lines in our graph manager configure_workflow()
method:
def configure_workflow(self):
(...)
deliverTask = Task('deliver', 'deliver.py', init_args=['myproject-articles'])
articlesTask.add_next_task(deliverTask)
(...)
We created a new task, deliverTask
, which will deliver the items generated by all the myproject-articles
job belonging to the current FLOW_ID (every other job will be ignored). And we added it in the workflow via add_next_task()
method of articlesTask
task. In this way we ensure that deliverTask
will be run once the articlesTask
is completed. And because this task was added via add_next_task()
, we don't need to include it in the return value of configure_workflow()
(remember that this return value must include only the root tasks).
Finally, a good workflow must include the cleaning of the frontier slots for a future crawl:
def configure_workflow(self):
(...)
cleanTask = Task('clean', 'hcfpal.py', init_args=['myproject-articles', self.args.slot_prefix])
articlesTask.add_next_task(cleanTask)
cleanLinksTask = Task('cleanLinks', 'hcfpal.py', init_args=['myproject-articles', 'links'])
LinksTask.add_next_task(cleanLinksTask)
(...)