-
Notifications
You must be signed in to change notification settings - Fork 15
Managing Hubstorage Crawl Frontiers
Previous Chapter: Crawl Managers
pip install hcf-backend
pip install scrapy-frontera
For big crawls, a single spider job is usually problematic. If the spider is stopped for any reason, you have to recrawl from zero, thus loosing time and costs in resources. If you are reading this documentation, I don't have to explain all the pain that stops mean with huge crawls when you run a single job crawl: you are here most probably because you are searching for a better approach for your project.
A crawl frontier is a store of requests that can be filled and consumed progressively by different processes. ScrapyCloud provides a native crawl frontier, Hubstorage Crawl Frontier (HCF).
And we provide an HCF backend for Frontera, which in turn is a framework for development of crawl
frontier handlers (backends), by stablishing both a common protocol and a common interface, plus provisioning of some extra facilities. For scrapy spiders,
the equation is completed with scrapy-frontera, which provides a scrapy scheduler that supports Frontera
without loosing any native
behavior from the original scrapy scheduler, so migration of a scrapy spider to Frontera
becomes painless.
When working with spiders we can identify two differentiated roles: a producer role and a consumer role, depending on whether the spider writes requests to a requests queue, or read them from it in order to actually process them. Usual scrapy spiders assume both roles at same time transparently: same spider writes requests to a local memory/disk queue, and later reads them from it. We don't usually think in a scrapy spider as a producer/consumer because they work natively in both ways at same time. But that is the logic behind the native scrapy scheduler, which takes care of:
- Queuing requests coming from spider, by default into a dual memory/disk queue
- Dequeuing requests into the downloader, under specific prioritization rules.
On the other hand, the scheduler implemented in the scrapy-frontera
package, which is a subclass
of the scrapy scheduler, provides interface with Frontera
and allows, either from spider code or configured by project settings, to send specific requests to Frontera
backend.
In this way, the Frontera
backend assumes the responsibility of queuing, dequeuing and prioritization of requests sent to/read from it, while the remaining requests follow the usual
flow within Scrapy backend. Still, the scheduler asks for requests from the Frontera
backend, adapts them and puts them into their local queues.
So, when using an external frontier, like HCF, we can separate producer and consumer roles into different spiders, and so this division of roles becomes more evident. But whether we separate roles or not, depends on our specific implementation. It is possible to easily adapt a scrapy spider in order to work with HCF with minimal effort, both as producer and consumer, as we will see.
Each ScrapyCloud project has its own HCF space separated from the rest. Within a project, HCF can be subdivided via frontier names, which in turn can be subdivided into slots via slots names.
Frontier names and slot names are created on the fly by the producer agents. Urls in HCF are organized into batches. All urls of a batch are read and deleted in block by the consumer. Number
of urls per batch are set by the producer, but has a limit of 100
.
An essential feature of HCF is request deduplication: any request sent to a given slot that has the same fingerprint than a previous request already sent to the same slot, will be
filtered out. HCF ensures that requests with same fingerprint are sent to the same slot.
Also,scrapy-frontera
ensures that the fingerprint scheme used is the same that scrapy uses internally, so deduplication in HCF will work exactly in the same way as you expect
in a spider that does not use HCF. Even more, you can use the dont_filter=True
flag in your requests, and the request will not be deduplicated. scrapy-frontera
adaptation
takes care of that by generating a random fingerprint in this case.
Another important feature of HCF is that writting to a specific slot must be done with no concurrency. That is, only one producer at a time can write batches to a given slot. This limitation provides increased performance to the HCF, but impose some limitations that has to be considered when designing the project. In most cases, it is enough to have a single producer instance writting to all slots, and multiple consumer instances in parallel, each one consuming from a different slot. But in case you need multiple producer instances in parallel, you could enforce that each one sends only urls assigned to a specific slot, and ignore the rest. This will however result in missing urls that were only seen by a single producer, so a working implementation need to be more elaborated, for example by sending instead all urls to a queue processed by a single high throughtput process pipeline, and delegate to it the writting to HCF.
Whether this approach lead to less or more performance than having a single producer, will ultimately depend on the specific application. Most probably, unless you need to implement a very high throughput broad crawler, the single producer approach is faster and use less resources.
hcf-backend
comes with a handy tool for managing (deleting, listing, dumping, counting, etc) HCF objects:
hcfpal.py:
python -m hcf_backend.utils.hcfpal
If you need to make it available on a project deployed on ScrapyCloud you need, as usual, to define your own script, i.e, scripts/hcfpal.py
:
from hcf_backend.utils.hcfpal import HCFPalScript
if __name__ == '__main__':
script = HCFPalScript()
script.run()
Follow command line help for instructions of usage.
The first step is to add scrapy-frontera and hcf-backend in your project requirements. Then, configure your project to work with scrapy-frontera:
From the scrapy-frontera README, this is the basic configuration you need to add on
your project settings.py
file in order to replace the native Scrapy scheduler by the scrapy-frontera
one:
# shut up frontera DEBUG flooding
import logging
logging.getLogger("manager.components").setLevel(logging.INFO)
logging.getLogger("manager").setLevel(logging.INFO)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
SCHEDULER = 'scrapy_frontera.scheduler.FronteraScheduler'
DOWNLOADER_MIDDLEWARES = {
'scrapy_frontera.middlewares.SchedulerDownloaderMiddleware': 0,
}
SPIDER_MIDDLEWARES = {
'scrapy_frontera.middlewares.SchedulerSpiderMiddleware': 0,
}
These changes alone will not affect the behavior of a scrapy project, but it will allow to implement spiders that works with HCF.
Producers and consumers need to be configured with different frontera and scrapy settings. So depending on your needs and taste you have two alternatives:
- Method 1: Have two separate spiders, one for producer and another for consumer.
- Method 2: Have a single spider for both, but configuring settings on runtime.
Method 1 could be enough for simple cases with simple workflow. Method 2 allows to adapt an already developed spider without changing a single spider line code. It works like just plugging an existing spider into a frontier workflow system. But requires the help of extra scripts. We will show both cases.
In this example we will implement two spiders, a producer and a consumer. The producer will crawl the target site, extract links and send them to frontier. The consumer will read from the frontier and scrape the provided links.
The producer:
class MySiteLinksSpider(Spider):
name = 'mysite.com-links'
frontera_settings = {
"BACKEND": "hcf_backend.HCFBackend",
'HCF_PRODUCER_FRONTIER': 'mysite-articles-frontier',
'HCF_PRODUCER_SLOT_PREFIX': 'test',
'HCF_PRODUCER_NUMBER_OF_SLOTS': 8,
}
custom_settings = {
'FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER': ['parse_article'],
}
def parse_article(self, response):
pass
def parse(Self, response):
(... link discovery logic ...)
yield Request(..., callback=self.parse_article)
The consumer:
class MySiteArticlesSpider(Spider):
name = 'mysyte.com-articles'
frontera_settings = {
"BACKEND": "hcf_backend.HCFBackend",
'HCF_CONSUMER_FRONTIER': 'mysite-articles-frontier',
'HCF_CONSUMER_MAX_BATCHES': 50,
}
def parse_article(self, response):
(... actual implementation of the article parsing method ...)
(...)
Let's review frontera settings involved in the example:
-
BACKEND
- Sets frontera backend class. In these examples we will always usehcf_backend.HCFBackend
-
HCF_PRODUCER_FRONTIER
andHCF_CONSUMER_FRONTIER
- set the HCF frontier name for producer and consumer. For producer and spider for the same set of spiders, it must be the same. -
HCF_PRODUCER_SLOT_PREFIX
- Sets the prefix of the slots that will be generated. If not provided, it is''
by default. -
HCF_PRODUCER_NUMBER_OF_SLOTS
- Sets the number of slots. Default value is1
. Slots generated by the producer will have names ranged from{slot prefix}0
to{slot prefix}{number of slots - 1}
-
HCF_CONSUMER_MAX_BATCHES
- Sets the limit of frontier batches that the consumer will process. -
HCF_CONSUMER_SLOT
- Sets the slot from which the consumer will read frontier batches.
Notice that we are not indicating anywhere in the consumer code the HCF_CONSUMER_SLOT
setting. If we only had one slot, this would have sense. However, if we want to
run several consumers in parallel, which is the typical use case of using a frontier, we need to pass this setting at run time. The way to do this is via
the spider argument frontera_settings_json
, for example:
> scrapy crawl mysite.com-articles -a frontera_settings_json='{"HCF_CONSUMER_SLOT": "test3"}'
An aspect you may find weird is the implementation of a method `parse_article()` that does nothing. This is required because the requests that this spider will send to the frontier,
need to reference to a callback with the same name as the one in the `mysite.com-articles` spider that will process the response. That is, you
will create here the requests with `Request(..., callback=self.parse_article,...)`, but the actual implementation of `parse_article()` is not in this spider, but in `mysite.com-articles`
instead.
For other HCF tunning settings refer to the [hcf backend](https://github.com/scrapinghub/hcf-backend/blob/master/hcf_backend/backend.py) documentation.
The configuration is completed via the scrapy side (not frontera side) setting, `FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER`. This setting tells the scrapy-frontera scheduler
that every request which its callback name is on the provided list (except for the requests generated by `start_requests()`), will be sent to the frontier.
An alternative to this setting is to add to the requests that has to be sent to the frontier, the meta key `'cf_store': True`:
```python
(...)
def parse(self, response):
(...)
yield Request(..., meta={'cf_store': True})
This requires, however, more intervention in the spider code and less separation of implementation from configuration. But it is up to the taste of the developer which approach to use.
cf_store
is really conserved for backward compatibility with older versions of scrapy-frontera, and for eventual fine tuning requirements.
Notice that, while hcf settings are added into frontera_settings
dict, the last one is added into custom_settings
dict.
This is because the hcf backend resides on the frontera side, while FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER
is a scrapy scheduler setting, and so it is read on scrapy
side. Optionally, you can place everything under custom_settings
only, but separation of scopes is a good practice.
Running manually and periodically all the consumer jobs or settings lots of periodic jobs is definitively not a convenient approach. Rather we will use the crawl manager provided
by the hcf crawlmanager. I.e. save this code into scripts/hcf_crawlmanager.py
:
from hcf_backend.utils.crawlmanager import HCFCrawlManager
if __name__ == '__main__':
hcf_crawlmanager = HCFCrawlManager()
hcf_crawlmanager.run()
This crawl manager is a subclass of the generic shub-workflow crawl manager described in the previous chapter. Command line is the same except for some additional positional arguments and options:
$ python hcf_crawlmanager.py --help
usage:
shub-workflow based script (runs on Zyte ScrapyCloud) for easy managing of consumer spiders from multiple slots. It checks available slots and schedules
a job for each one.
[-h] [--project-id PROJECT_ID] [--name NAME] [--flow-id FLOW_ID] [--tag TAG] [--loop-mode SECONDS] [--max-running-jobs MAX_RUNNING_JOBS] [--resume-workflow] [--spider-args SPIDER_ARGS]
[--job-settings JOB_SETTINGS] [--units UNITS] [--frontera-settings-json FRONTERA_SETTINGS_JSON]
spider frontier prefix
positional arguments:
spider Spider name
frontier Frontier name
prefix Slot prefix
optional arguments:
-h, --help show this help message and exit
--project-id PROJECT_ID
Overrides target project id.
--name NAME Script name.
--flow-id FLOW_ID If given, use the given flow id.
--tag TAG Additional tag added to the scheduled jobs. Can be given multiple times.
--loop-mode SECONDS If provided, manager will run in loop mode, with a cycle each given number of seconds. Default: 0
--max-running-jobs MAX_RUNNING_JOBS
If given, don't allow more than the given jobs running at once. Default: 1
--resume-workflow Resume workflow. You must use it in combination with --flow-id in order to set the flow id of the worklow you want to resume.
--spider-args SPIDER_ARGS
Spider arguments dict in json format
--job-settings JOB_SETTINGS
Job settings dict in json format
--units UNITS Set number of ScrapyCloud units for each job
--frontera-settings-json FRONTERA_SETTINGS_JSON
Here we add two positonal arguments: frontier
and slot prefix
and the option --frontera-settings-json
. By using this
crawl manager we don't even need to provide any frontera settings in the consumer code, so we can remove all them, and pass eveything via the crawl manager:
$ SH_APIKEY=<your zyte apikey>
$ python hcf_crawlmanager.py mysite.com-articles mysite-articles-frontier test --frontera-settings-json='{"HCF_CONSUMER_MAX_BATCHES": 50, "BACKEND": "hcf_backend.HCFBackend"}' --max-running-jobs=8 --loop-mode=60
The manager will take care of scheduling up to 8 jobs in parallel, one per slot, as we also set to 8 the frontera setting HCF_PRODUCER_NUMBER_OF_SLOTS
in the producer.
Once a job completes the processing of 50 batches, a free slot is available to schedule a new consumer job and the crawl manager will do that and repeat in cycles, until
all the batches are consumed. Notice the previous exporting of SH_APIKEY
environment variable. This apikey is needed for accessing HCF resources in Zyte Scrapy Cloud.
When invoked in this way, by deault, the spiders will be scheduled in the SC project defined by the default
entry in the scrapinghub.yml
file. The target project can be
overriden either via the PROJECT_ID environment variable, or more explicitly by adding the option --project-id
in the command line invokation. If the script is invoked
in the Scrapy Cloud itself, by default the spiders will be scheduled in the same project where the script is running, but can be overriden by --project-id
option, which
allows cross project scheduling.
Alternatively you can provide some hard coded default parameters in the hcf crawl manager script itself:
from hcf_backend.utils.crawlmanager import HCFCrawlManager
class MyHCFCrawlManager(HCFCrawlManager):
loop_mode = 60
default_max_jobs = 8
if __name__ == '__main__':
hcf_crawlmanager = MyHCFCrawlManager()
hcf_crawlmanager.run()
So the command line can be shorter:
$ python hcf_crawlmanager.py mysite.com-articles mysite-articles-frontier test --frontera-settings-json='{"HCF_CONSUMER_MAX_BATCHES": 50, "BACKEND": "hcf_backend.HCFBackend"}'
Let rewrite our two-spider crawler into a single one, this time with no frontera related hardcoded settings at all:
class MySiteArticlesSpider(Spider):
name = 'mysite.com'
def parse(Self, response):
(... link discovery logic ...)
yield Request(..., callback=self.parse_article)
def parse_article(self, response):
(... actual implementation of the article parsing method ...)
We can run the spider as a normal one without using frontier. Or we can reproduce the same frontier workflow intended in the method 1 section, by using another external script, a graph manager. The graph manager allows to automatize the building of Zyte Scrapy Cloud workflows by defining a graph of tasks.
For this purpose we create a manager script (i.e. scripts/manager_articles.py
), by subclassing GraphManager
class:
import json
from typing import Tuple
from shub_workflow.graph import GraphManager
from shub_workflow.graph.task import Task, SpiderTask
class MyArticlesGraphManager(GraphManager):
loop_mode = 120
default_max_jobs = 8
frontier = 'mysite-articles-frontier'
def add_argparser_options(self):
super().add_argparser_options()
self.argparser.add_argument("--slot-prefix", default="test")
def configure_workflow(self) -> Tuple[Task]:
producer_frontera_settings = {
"BACKEND": "hcf_backend.HCFBackend",
"HCF_PRODUCER_FRONTIER": self.frontier,
"HCF_PRODUCER_SLOT_PREFIX": self.args.slot_prefix,
"HCF_PRODUCER_NUMBER_OF_SLOTS": self.args.max_running_jobs,
}
producer_settings = {
"FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER": "parse_article",
}
discover = SpiderTask(
"discover",
"mysite.com",
job_settings=producer_settings,
frontera_settings_json=json.dumps(producer_frontera_settings)
)
consumer_frontera_settings = {
"BACKEND": "hcf_backend.HCFBackend",
"HCF_CONSUMER_FRONTIER": self.frontier,
"HCF_CONSUMER_MAX_BATCHES": 50,
}
consumer_sertings = {
"FRONTERA_SCHEDULER_SKIP_START_REQUESTS": True,
}
scrapers = Task(
"scrapers",
"hcf_crawlmanager.py",
init_args=[
"mysite.com",
self.frontier,
self.args.slot_prefix,
"--loop-mode=60",
f"--frontera-settings-json={json.dumps(consumer_frontera_settings)}",
f"--max-running-jobs={self.args.max_running_jobs}",
f"--job-settings={json.dumps(consumer_settings)}"
]
)
discover.add_next_task(scrapers)
return (discover,)
Lets dig into the details of this code. The workflow is defined via the configure_workflow()
method. This method defines two tasks: discover
and scrapers
. The discover
task is a spider task. It schedules the producer, using same frontera and scrapy settings as before, so they don't
need explanation. The SpiderTask class is a wrapper to define the scheduling of a spider. The first parameter is the name of the task. The second
one is the name of the spider. Then we are passing job_settings
and frontera_settings_json
parameters.
The scrapers
task is a script task. The first parameter is the task name, the second, the script name, and then a list of arguments, which are
the same we passed manually to the hcf_crawlmanager.py
invoked in the previous sections, except that we are passing an additional scrapy
setting: FRONTERA_SCHEDULER_SKIP_START_REQUESTS
. This setting instructs the consumer not to execute start requests. In our previous section,
this setting was not required because we had separated consumer and producer codes. Here we need it in order to avoid the consumer to perform
discovery stage.
The workflow declaration is completed by adding the scrapers task as a next job of discovery task. So, when discovery finishes, scrapers is
scheduled. The return value of the configure_workflow()
method is a tuple of tasks which will be the root tasks. In this case, only discover
will be the root. In order to execute this workflow, the script must be invoked in this way:
$ python manager_articles.py --root-jobs
The same rules described before for hcf_manager.py
apply regarding the target SC project where the jobs are scheduled.
In some use cases, the producer can take long time to be completed. However, during the run the requests are already being sent to the frontier. If you don't
want the consumers task to wait for the producer to be completed and start the consumers as soon as possible, you can modify slightly the workflow by adding
the scrapers task the wait_time
parameter, and returning both discover and scraper tasks as root jobs, instead of adding one as next job of the other:
(...)
scrapers = Task(
"scrapers",
"hcf_crawlmanager.py",
init_args=[
"mysite.com",
self.frontier,
self.args.slot_prefix,
"--loop-mode=60",
f"--frontera-settings-json={json.dumps(consumer_frontera_settings)}",
f"--max-running-jobs={self.args.max_running_jobs}",
f"--job-settings={json.dumps(consumer_settings)}"
],
wait_time=600,
)
return (discover, scraper)
The addition of wait_time=600
instructs the scrapers task to wait 10 minutes before actually starting, thus giving a margin to the discovery task to generate
requests to start to consumer (if the hcf_manager.py
don't find any request in the target slots, it will consider that the crawl was completed and will terminate)
What if the producer spider needs very long time to be completed and a failure in the middle stops it prematurely? This is the typical case of a broad crawler:
class MySiteArticlesSpider(Spider):
name = 'mysite.com'
def parse(Self, response):
(... link discovery logic ...)
if <some condition>:
# articles links
yield Request(..., callback=self.parse_article)
else:
# other links
yield Request(..., callback=self.parse)
def parse_article(self, response):
(... actual implementation of the article parsing method ...)
It is very expensive to start again from zero if the producer stops. So here we can also save the status of the discovery spider in the frontier.
We need to write exploration links on a different set of slots than the one we used for articles links. So, we need a way to map own requests to a
different set of slots. And that feature is provided by the scrapy-frontera setting FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP
. So, you will
modify the previous configure_workflow()
method by changing the producer section in this way:
(...)
producer_frontera_settings = {
"BACKEND": "hcf_backend.HCFBackend",
"HCF_PRODUCER_FRONTIER": self.frontier,
"HCF_PRODUCER_SLOT_PREFIX": "links",
}
producer_settings = {
"FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER": "parse,parse_article",
'FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP': {
'parse_article': f'{self.args.slot_prefix}/{self.args.max_running_jobs}',
},
}
discover = Task(
"discover",
"hcf_crawlmanager.py",
init_args=[
"mysite.com",
self.frontier,
"links",
"--loop-mode=60",
f"--frontera-settings-json={json.dumps(producer_frontera_settings)}",
f"--job-settings={json.dumps(producer_settings)}"
]
)
Let's review the changes performed:
- Notice the addition of 'parse' into the
FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER
list. Now we will send to the frontier not only the requests withparse_article()
callback as previously, but also the ones with theparse()
callback. - We changed the value of
HCF_PRODUCER_SLOT_PREFIX
tolinks
and removed theHCF_PRODUCER_NUMBER_OF_SLOTS
fromproducer_frontera_settings
, so the number of slots will take the default value1
. So by default the producer will write all requests to a single slotlinks0
. - We now configure the target slots for the consumer via
FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP
. -
discover
task is now an hcf crawlmanager scriptTask
instead of aSpiderTask
. This hcf crawlmanager will configure the producer also as a consumer, and take care of its periodic scheduling.
This will result in a producer that on first job will scrape the start url, extract links, both for itself and for the consumer, and finish. The hcf crawl manager will then detect the
presence of requests in the links0
slot and schedule the producer again, consumer all links saved in the frontier, generate more links and stop. Once the requests generated for itself
are big enough, the flush of them to the frontier will be performed before the spider finish, so it will reach a continuous regime of reading/writing without stopping, except for an
abnormal situation. This premature stopping will not, however, will not affect significantly the producer crawl process. Once the producer is rescheduled by the hcf crawl manager,
it will continue by reading the next batch after the one last read in the failed job. If that last batch was not fully processed, some requests will be lost.
If you want to avoid this collateral loss, you can pass an additional HCF frontera setting: HCF_CONSUMER_DELETE_BATCHES_ON_STOP=True
. By default, batches are deleted immediately after
read, and that is the cause of the loss in case the job finishes abnormally. With this setting you will ensure that batches are deleted when the spider stops. However, if you
provide this setting with no extra configuration, as the batches are deleted on finish and the producer never reaches that stage, it will loop reading continuously the same batches once
and again. So when using this setting you need to provide additional settings in order to ensure that the producer finishes each time it reads some number of batches. You must play with
HCF_CONSUMER_MAX_BATCHES
and MAX_NEXT_REQUESTS
. MAX_NEXT_REQUESTS
governs how many requests will be read on a next read request. This is a frontera (not HCF frontera backend) setting,
and its default value is 64. The number of batches before stopping will depend on this value and the batch sizes generated by the producer (see HCF_PRODUCER_BATCH_SIZE
on
hcf backend documentation). Roughly, number of read batches on each read cycle will be
int(MAX_NEXT_REQUESTS / <batch size>)
. And take special care to set HCF_CONSUMER_MAX_BATCHES
to a number equal or smaller than this one. If bigger, the spider will read some batches
more than once, thus loosing performance. Also, HCF_CONSUMER_MAX_BATCHES
must be bigger than 0, which is the default value for this setting. 0 means not limit the number of batches to read,
and this in combination with HCF_CONSUMER_DELETE_BATCHES_ON_STOP=True
will result in a job reading the same batches forever and never stop.
Here you will have a compromise: the smaller the number of read batches per job, the bigger the proportion of tail requests in the total job time, and more cost in setting up/shutting down SC jobs, so performance will decay. But as the number of read batches per job is bigger, more memory you will use, and more links will be reprocessed on a new job if current one finishes abnormally. If the spider is enough stable this second effect will be insignificant, however. But the memory factor will still be very important. A starting typical number could be to read around 100 batches per job, so an appropiate set of settings for this case would be:
producer_frontera_settings = {
"BACKEND": "hcf_backend.HCFBackend",
"HCF_PRODUCER_FRONTIER": self.frontier,
"HCF_PRODUCER_SLOT_PREFIX": "links",
HCF_CONSUMER_MAX_BATCHES: 100,
MAX_NEXT_REQUESTS: 10000,
}
With the combination of settings above you will never read less than 100 batches. And number of requests read will be between 10000 and 10099 depending on exact batch sizes, which is
not a constant value. Starting from these values you can tune your project further, as ultimately the best performance will depend on the exact case. As the maximal size of an HCF batch
is 100, this will also ensure that HCF_CONSUMER_MAX_BATCHES
is equal or smaller than MAX_NEXT_REQUESTS / <batch size>
.
When you have a large crawl split into many jobs, the delivery of data into a single file can become a pain. Shub-workflow provides a deliver script that facilitates this spider, by reading items
from all the jobs of the given target spider jobs with same FLOW_ID
tag as the delivery script. As all the scripts scheduled by a graph manager and hcf manager inherits same flow id, it is ensured
that all and only the spider jobs belonging to same crawl will be read and delivered by the deliver script.
Let's suppose we want to deliver our job items into s3 (the script also supports GCS and local file storage), so we would add to our project the script scripts/deliver.py
:
from shub_workflow.deliver import S3DeliverScript
class MyDeliverScript(S3DeliverScript):
s3_bucket_name = 'myproject_bucket'
if __name__ == '__main__':
deliver = MyDeliverScript()
deliver.run()
The above subclassing is just the simplest one, and only overrides a single attribute. But this class is designed for easy overriding of multiple attributes and methods in order to provide very flexible customization for the specific needs of a project. (see DeliverScript docstring). So, in our example, once the crawl is finished, which happens once the consumer task is completed, you want a delivery:
def configure_workflow(self):
(...)
deliverTask = Task('deliver', 'deliver.py', init_args=['mysite.com'])
scrapers.add_next_task(deliverTask)
(...)
Finally, a good workflow must include the cleaning of the frontier slots for a future crawl. For our tutorial example:
def configure_workflow(self):
(...)
clean = Task('clean', 'hcfpal.py', init_args=["delete", self.frontier, self.args.slot_prefix])
scrapers.add_next_task(clean)
(...)
And additionally, for the broad crawler example:
def configure_workflow(self):
(...)
cleanLinks = Task('cleanLinks', 'hcfpal.py', init_args=["delete", self.frontier, 'links'])
discover.add_next_task(cleanLinks)
(...)