Commit ce2c9c78 authored by sim's avatar sim

Scrapers: major refactor and many new features

parent 8efa9762
import logging import logging
import urllib3
from pathlib import Path from pathlib import Path
from datetime import datetime
from urllib.parse import urlencode from urllib.parse import urlencode
from scrapy.spiders import Spider from scrapy.spiders import Spider
from scrapy.signals import response_received from scrapy.signals import response_received, spider_error, item_dropped
from scrapy.http.request import Request as BaseRequest from scrapy.http.request import Request as BaseRequest
from gargantext.utils.json import json_dumps
from gargantext.utils.dates import datetime
from gargantext.utils.convert import to_int, to_bool, to_str
from .responses import TextResponse, HtmlResponse, XmlResponse, JsonResponse, \ from .responses import TextResponse, HtmlResponse, XmlResponse, JsonResponse, \
RISResponse RISResponse
...@@ -29,35 +33,85 @@ class Request(BaseRequest): ...@@ -29,35 +33,85 @@ class Request(BaseRequest):
class Scraper(Spider): class Scraper(Spider):
MAX_COUNT = None MAX_COUNT = 1000
BATCH_SIZE = 100 BATCH_SIZE = 100
DEBUG_DIR = '/tmp' DEBUG_DIR = '/tmp'
ARGUMENTS = ['url', 'count', 'query', 'count_only'] ARGUMENTS = {
'user': (to_str, None),
url = None 'corpus': (to_int, None),
count = None 'callback': (to_str, None),
query = '' 'url': (to_str, None),
count_only = False 'limit': (to_int, None),
'query': (to_str, None),
'count_only': (to_bool, False),
'report_every': (to_int, BATCH_SIZE),
}
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
# The default __init__ method will take any spider arguments and copy # The default __init__ method take any spider arguments and copy them
# them to the spider as attributes: filter arguments for security # to the spider as attributes: filter arguments for security purposes,
# purposes. # converting them safely and falling back to default when needed.
spider_args = {k: v for k, v in kwargs.items() if k in self.ARGUMENTS} spider_args = {
k: convert(kwargs.get(k)) or default
for k, (convert, default) in self.ARGUMENTS.items()
}
super().__init__(*args, **spider_args) super().__init__(*args, **spider_args)
self.logger.info("Arguments: %r", spider_args)
default_parser = getattr(self, 'default_parser', None) default_parser = getattr(self, 'default_parser', None)
if default_parser and not hasattr(self, 'parse'): if default_parser and not hasattr(self, 'parse'):
# XXX Use setattr to bypass pylint warning... # XXX Use setattr to bypass pylint warning...
setattr(self, 'parser', getattr(self, default_parser)) setattr(self, 'parse', getattr(self, default_parser))
self._total = None
self.counter = 0
self.events = []
# For errors/events reporting
self.http = urllib3.PoolManager()
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super().from_crawler(crawler, *args, **kwargs)
crawler.signals.connect(spider.on_error, signal=spider_error)
crawler.signals.connect(spider.on_item_dropped, signal=item_dropped)
crawler.signals.connect(spider.trace, signal=response_received)
spider.report('event', events=[{
"level": "INFO",
"message": "Scraper %s started" % spider.name,
}])
return spider
def start_requests(self): def start_requests(self):
if self.url: # and self.url.startswith('file://'): if self.url and self.url.startswith('file:'):
yield Request(self.url) yield self.request(self.url)
else: else:
yield from self.dispatch() yield from self.dispatch()
def request(self, url, callback=None, method='GET', headers=None,
body=None, cookies=None, meta=None, encoding='utf-8', priority=0,
dont_filter=False, errback=None, flags=None, params=None):
if callback is None:
# Callback has to be specify explicitly to set an errback, so
# get `parse` method or no-op by default.
callback = getattr(self, 'parse', lambda x: None)
if errback is None:
errback = self.errback
return Request(url, callback, method, headers, body, cookies, meta,
encoding, priority, dont_filter, errback, flags, params)
def reach_limit(self):
self.counter += 1
return self.count is None or self.counter >= self.count
@property @property
def logger_name(self): def logger_name(self):
return 'scrapers.%s' % self.name return 'scrapers.%s' % self.name
...@@ -68,18 +122,79 @@ class Scraper(Spider): ...@@ -68,18 +122,79 @@ class Scraper(Spider):
return logging.LoggerAdapter(logger, {'spider': self}) return logging.LoggerAdapter(logger, {'spider': self})
@property @property
def limit(self): def total(self):
if self.MAX_COUNT is None: return self._total
return self.count or 0
if self.count is None:
return self.MAX_COUNT
return min(self.count, self.MAX_COUNT)
@classmethod @total.setter
def from_crawler(cls, crawler, *args, **kwargs): def total(self, value):
spider = super().from_crawler(crawler, *args, **kwargs) self.logger.info("Total document count: %s", value)
crawler.signals.connect(spider.trace, signal=response_received) self._total = to_int(value)
return spider
@property
def count(self):
limit = self.limit or self.MAX_COUNT
if self.total is None:
return limit
return None if limit is None else min(limit, self.total)
def add_event(self, level, message):
levels = set(["CRITICAL", "FATAL", "ERROR", "WARNING", "INFO", "DEBUG"])
level_code = getattr(logging, level, 0) if level in levels else 0
self.logger.log(level_code, message)
self.events.append({
"level": level,
"message": message,
})
def report(self, path, succeeded=0, failed=0, remaining=None, events=None):
# Silently fail if not bounded to a crawler or callback is undefined
if not getattr(self, 'crawler', None) or not self.callback:
return
assert path in ('output', 'error', 'event')
events = self.events + (events or [])
self.events = []
url = '%s/%s' % (self.callback, path)
status = dict(succeeded=succeeded,
failed=failed,
remaining=remaining,
events=events)
data = { k: v for k, v in status.items() if v is not None }
try:
self.http.request('POST', url,
body=json_dumps(data).encode('utf-8'),
headers={'Content-Type': 'application/json'})
except urllib3.exceptions.HTTPError:
# Ignore HTTP errors when reporting...
pass
def on_error(self, failure, response, spider):
# FIXME Any reporting to do here?
self.logger.debug("Catched error: %r %r %r", failure, response, spider)
def on_item_dropped(self, item, response, exception, spider):
reason = ' '.join(exception.args)
self.add_event("ERROR", "Ignore document: %s" % reason)
def errback(self, failure):
exc = failure.value
response = getattr(exc, 'response', None)
message = "Error in %s scraper" % self.name
if response is not None:
message = "%s while fetching %s, got HTTP %s" % (
message, response.request.url, response.status)
message = "%s: %s" % (message, failure.getErrorMessage())
self.add_event("FATAL", message)
def trace(self, response, request, spider): def trace(self, response, request, spider):
content_type = response.headers.get('content-type', b'').decode() content_type = response.headers.get('content-type', b'').decode()
......
from scrapy.exceptions import DropItem
class DropDocument(DropItem):
pass
from datetime import datetime
from scrapy.item import Item, Field
from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst, Compose, MapCompose, Identity
from .processors import filter_empty
DateTime = Field(serialize=str)
String = Field()
class Document(Item):
id = String
title = String
abstract = String
source = String
url = String
lang = String
authors = String
publication = DateTime
creation = DateTime
class DocumentLoader(ItemLoader):
default_item_class = Document
default_output_processor = TakeFirst()
to_datetime = Compose(MapCompose(str.strip, int), filter_empty, lambda args: datetime(*args))
publication_out = to_datetime
creation_out = to_datetime
authors_out = Identity()
def __init__(self, selector, *args, **kwargs):
kwargs['selector'] = selector
super().__init__(*args, **kwargs)
def add_xpaths_text(self, xpaths):
for field_name, xpath in xpaths.items():
self.add_xpath(field_name, '%s/text()' % xpath)
def add_values(self, values):
for field_name, value in values.items():
self.add_value(field_name, value)
def parse(self, obj):
return NotImplementedError("don't use DocumentLoader directly.")
def load(self):
self.parse(self.selector)
return self.load_item()
from .document import Document, DocumentLoader
import logging
from scrapy.item import Item
from .fields import ListType, DropDocument
logger = logging.getLogger(__name__)
class TypedItem(Item):
def preprocess(self):
for field_name, field_type in self.fields.items():
if not isinstance(field_type, ListType):
self[field_name] = next(iter(self.get(field_name, [])), None)
if self.get(field_name) is None and 'default' in field_type:
self[field_name] = field_type.get('default')
def validate(self):
missing = []
invalid = []
for field_name, field_type in self.fields.items():
required = field_type.get('required', False)
value = self.get(field_name)
if required and value is None:
missing.append(field_name)
if not isinstance(value, field_type):
invalid.append((field_name, type(value)))
errors = []
if missing:
errors.append("Missing fields: %s" % ', '.join(missing))
if invalid:
fields = ("%s (%r expected, got %s)" % (
self[f], self.fields[f], t.__name__) for f, t in invalid)
errors.append("Invalid fields: %s" % ', '.join(fields))
if errors:
raise DropDocument(*errors)
import logging
from scrapy.loader import ItemLoader
from gargantext.utils.dates import DEFAULT_DATETIME, to_datetime
from gargantext.utils.lang import lang_name
from .base import TypedItem
from .fields import DateTime, String, Int, Real, Dict, List, Either, \
DropDocument
logger = logging.getLogger(__name__)
class Document(TypedItem):
id = String(required=True)
title = String(required=True, default='')
abstract = String(required=True, default='')
source = String(required=True, default='')
authors = List(Either(String, Dict), required=True, default=[])
url = String(required=True, default='')
publication = DateTime(required=True, default=DEFAULT_DATETIME)
lang = String
lang_detected = String
lang_detected_confidence = Real
creation = DateTime
type = String
doi = String
isbn = String
issue = String
struct_inst = List(Int)
struct_dept = List(Int)
struct_labs = List(Int)
struct_team = List(Int)
publication_year = Int
publication_month = Int
publication_day = Int
creation_year = Int
creation_month = Int
creation_day = Int
def preprocess(self):
warnings = super().preprocess() or []
for field_name in ['publication', 'creation']:
if field_name in self:
raw = self.get(field_name)
default = self.fields[field_name].get('default')
required = self.fields[field_name].get('required')
try:
value = to_datetime(raw, default=default) if raw else default
except ValueError as e:
value = default if required else None
warnings.append("%s: %s" % (field_name, ' '.join(e.args)))
self[field_name] = value
if value:
self[field_name + '_year'] = int(value.strftime("%Y"))
self[field_name + '_month'] = int(value.strftime("%m"))
self[field_name + '_day'] = int(value.strftime("%d"))
self['lang'] = lang_name(self.get('lang'))
return warnings
def validate(self):
warnings = []
if not self.get('title'):
warnings.append('No title, is something wrong?')
if not self.get('title') and not self.get('abstract'):
raise DropDocument('Missing both title and abstract')
return warnings + (super().validate() or [])
class DocumentLoader(ItemLoader):
default_item_class = Document
def __init__(self, selector, *args, **kwargs):
kwargs['selector'] = selector
super().__init__(*args, **kwargs)
def add_xpaths_text(self, xpaths):
for field_name, xpath in xpaths.items():
self.add_xpath(field_name, '%s/text()' % xpath)
def add_values(self, values):
self.add_value(None, values)
def parse(self, obj):
return NotImplementedError("don't use DocumentLoader directly.")
def load(self):
self.parse(self.selector)
return self.load_item()
import logging
from datetime import datetime
from scrapy.item import Field
from ..exceptions import DropDocument
__all__ = ['FieldType', 'ListType', 'EitherType', 'Any', 'DateTime', 'String',
'Int', 'Real', 'List', 'Dict', 'Either', 'DropDocument']
logger = logging.getLogger(__name__)
class FieldType(Field):
def __init__(self, name, *args, **kwargs):
self.__name__ = name
self.args = list(args)
super().__init__(**kwargs)
def check_type(self, instance):
typ = self.get('type')
return isinstance(instance, typ) if typ else True
def __call__(self, *args, **kwargs):
return type(self)(self.__name__, *args, **kwargs)
def __instancecheck__(self, instance):
return self.check_type(instance) or \
(not self.get('required') and instance is None)
def __repr__(self):
if not self.args and not self:
return self.__name__
args = ', '.join(str(a) for a in self.args)
opts = ', '.join(('%s=%s' % (k, getattr(v, '__name__', repr(v)))
for k, v in self.items()))
params = '%s, %s' % (args, opts) if args and opts else args or opts
return '%s(%s)' % (self.__name__, params)
class ListType(FieldType):
def __init__(self, name, item_type, *args, **kwargs):
kwargs.setdefault('item_type', item_type)
super().__init__(name, *args, **kwargs)
self.args = [item_type] + self.args
def check_type(self, instance):
item_type = self.get('item_type')
return (isinstance(instance, list) and
all(isinstance(item, item_type) for item in instance))
class EitherType(FieldType):
def check_type(self, instance):
return any(isinstance(instance, t) for t in self.args)
Any = FieldType('Any')
DateTime = FieldType('DateTime', type=datetime)
String = FieldType('String', type=str)
Int = FieldType('Int', type=int)
Real = FieldType('Real', type=float)
Dict = FieldType('Dict', type=dict)
List = ListType('List', Any)
Either = EitherType('Either')
# XXX This is needed because database and models depends on Django
_DJANGO_SETUP = False
if not _DJANGO_SETUP:
import django
django.setup()
_DJANGO_SETUP = True
import logging
from gargantext.models.nodes import DocumentNode, CorpusNode
from gargantext.core.db import Session
from gargantext.utils.lang import lang_detect
logger = logging.getLogger(__name__)
class PreprocessPipeline(object):
def add_warnings(self, scraper, warnings):
if warnings:
for warn in warnings:
scraper.add_event("WARNING", warn)
def process_item(self, item, scraper):
w = item.preprocess()
self.add_warnings(scraper, w)
return item
class ValidationPipeline(PreprocessPipeline):
def process_item(self, item, scraper):
w = item.validate()
self.add_warnings(scraper, w)
return item
class LangDetectPipeline(object):
THRESHOLD = 0
def process_item(self, item, scraper):
parts = [item.get('title'), item.get('abstract')]
text = ' '.join((p or '' for p in parts)).strip()
if text:
lang, confidence = lang_detect(text)
if confidence >= self.THRESHOLD:
item['lang_detected'] = lang
item['lang_detected_confidence'] = confidence
else:
logger.warning(
"Detected %s on %r with insufficient confidence of %s",
lang, (text[:75] + '..') if len(text) > 75 else text,
confidence)
return item
class DatabasePipeline(object):
def open_spider(self, scraper):
if not scraper.user:
raise ValueError("No user specified")
if not scraper.corpus:
raise ValueError("No corpus specified")
self.db = Session()
self.db.login(scraper.user)
self.corpus = self.db.query(CorpusNode).filter_by(id=scraper.corpus).one_or_none()
def close_spider(self, scraper):
self.corpus["resources"] = [{"type": 0}]
self.corpus.save_hyperdata()
self.db.commit()
def process_item(self, item, scraper):
doc = DocumentNode(name=item.get('title')[:DocumentNode.NAME_MAXLEN],
parent_id=self.corpus.id,
hyperdata=dict(item))
self.corpus.related.append(doc)
if scraper.counter % scraper.BATCH_SIZE == 0:
self.db.commit()
return item
class ReportingPipeline(object):
def __init__(self, crawler):
self.crawler = crawler
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def report(self, path, scraper, completed=False):
# See https://doc.scrapy.org/en/latest/topics/stats.html
stats = self.crawler.stats
# See https://doc.scrapy.org/en/latest/topics/api.html#stats-collector-api
scraped_count = stats.get_value('item_scraped_count', 0)
dropped_count = stats.get_value('item_dropped_count', 0)
pending_count = None if scraper.count is None else \
max(0, scraper.count - scraped_count)
if completed and pending_count is not None and pending_count > 0:
inflexion = "s were" if pending_count > 1 else " was"
message = "%s item%s never fetched" % (pending_count, inflexion)
scraper.add_event("WARNING", message)
dropped_count += pending_count
pending_count = 0
scraper.report(path,
succeeded=scraped_count,
failed=dropped_count,
remaining=pending_count)
def close_spider(self, scraper):
self.report('output', scraper, completed=True)
def process_item(self, item, scraper):
scraped_count = self.crawler.stats.get_value('item_scraped_count', 0)
if scraped_count % scraper.report_every == 0:
self.report('event', scraper)
return item
from scrapy.utils.iterators import xmliter_lxml as xmliter from scrapy.utils.iterators import xmliter_lxml as xmliter
from gargantext.datasource import Scraper, Request from gargantext.datasource import Scraper
from gargantext.datasource.items import DocumentLoader from gargantext.datasource.items import DocumentLoader
from gargantext.datasource.responses import XmlResponse from gargantext.datasource.responses import XmlResponse
...@@ -34,6 +34,7 @@ class PubmedDocumentLoader(DocumentLoader): ...@@ -34,6 +34,7 @@ class PubmedDocumentLoader(DocumentLoader):
'title': 'MedlineCitation/Article/ArticleTitle', 'title': 'MedlineCitation/Article/ArticleTitle',
'abstract': 'MedlineCitation/Article/Abstract/AbstractText', 'abstract': 'MedlineCitation/Article/Abstract/AbstractText',
'source': 'MedlineCitation/Article/Journal/Title', 'source': 'MedlineCitation/Article/Journal/Title',
# FIXME Language of the article but abstract seem to be in english
'lang': 'MedlineCitation/Article/Language', 'lang': 'MedlineCitation/Article/Language',
# https://www.nlm.nih.gov/bsd/licensee/elements_descriptions.html#journalissue # https://www.nlm.nih.gov/bsd/licensee/elements_descriptions.html#journalissue
'publication': 'MedlineCitation/Article/Journal/JournalIssue/PubDate/' + PD, 'publication': 'MedlineCitation/Article/Journal/JournalIssue/PubDate/' + PD,
...@@ -60,11 +61,12 @@ class PubmedScraper(Scraper): ...@@ -60,11 +61,12 @@ class PubmedScraper(Scraper):
webenv = None webenv = None
querykey = None querykey = None
retmax = Scraper.BATCH_SIZE # 100000 is a hard limit of pubmed, see <https://www.ncbi.nlm.nih.gov/books/NBK25499/>
retmax = min(Scraper.BATCH_SIZE, 100000)
def dispatch(self): def dispatch(self):
if not (self.webenv and self.querykey and self.count is not None): if not self.webenv or not self.querykey or self.total is None:
yield Request('%s/esearch.fcgi' % self.base_url, yield self.request('%s/esearch.fcgi' % self.base_url,
callback=self.parse_esearch, callback=self.parse_esearch,
params={ params={
'db': 'pubmed', 'db': 'pubmed',
...@@ -77,10 +79,10 @@ class PubmedScraper(Scraper): ...@@ -77,10 +79,10 @@ class PubmedScraper(Scraper):
yield from self.request_results() yield from self.request_results()
def request_results(self): def request_results(self):
if not self.count_only and self.webenv and self.querykey: if not self.count_only and self.total and self.webenv and self.querykey:
# XXX PubMed documentation is confusing: need to start at 0, not 1 # XXX PubMed documentation is confusing: need to start at 0, not 1
for retstart in range(0, self.limit, self.retmax): for retstart in range(0, self.count, self.retmax):
yield Request('%s/efetch.fcgi' % self.base_url, yield self.request('%s/efetch.fcgi' % self.base_url,
callback=self.parse_efetch, callback=self.parse_efetch,
params={ params={
'db': 'pubmed', 'db': 'pubmed',
...@@ -95,7 +97,7 @@ class PubmedScraper(Scraper): ...@@ -95,7 +97,7 @@ class PubmedScraper(Scraper):
def parse_esearch(self, response): def parse_esearch(self, response):
result = response.xpath('/eSearchResult') result = response.xpath('/eSearchResult')
self.count = int(result.xpath('./Count/text()').extract_first()) self.total = result.xpath('./Count/text()').extract_first()
self.webenv = result.xpath('./WebEnv/text()').extract_first() self.webenv = result.xpath('./WebEnv/text()').extract_first()
self.querykey = result.xpath('./QueryKey/text()').extract_first() self.querykey = result.xpath('./QueryKey/text()').extract_first()
...@@ -104,3 +106,5 @@ class PubmedScraper(Scraper): ...@@ -104,3 +106,5 @@ class PubmedScraper(Scraper):
def parse_efetch(self, response): def parse_efetch(self, response):
for article in xmliter(response, 'PubmedArticle'): for article in xmliter(response, 'PubmedArticle'):
yield PubmedDocumentLoader(article).load() yield PubmedDocumentLoader(article).load()
if self.reach_limit():
return
import logging
from gargantext.datasource import Scraper, RISResponse from gargantext.datasource import Scraper, RISResponse
from gargantext.datasource.items import Document, DocumentLoader
from gargantext.utils.dates import to_datetime, datetime
__all__ = ['RISScraper'] __all__ = ['RISScraper']
logger = logging.getLogger(__name__)
class RISDocumentLoader(DocumentLoader):
def parse(self, entry):
if not (entry.get('id') or '').strip():
entry['id'] = entry.get('doi')
if not (entry.get('title') or '').strip():
entry['title'] = entry.get('primary_title')
if not (entry.get('source') or '').strip():
entry['source'] = entry.get('secondary_title')
# In Scopus PY may refer to the first day of the publication period
# with the year omitted, in this case we have to get year from Y2
# FIXME Sometimes month is also omitted, how to be aware of this?
publication = to_datetime(entry.get('publication'),
default=datetime(1, 1, 1))
if publication.year == 1:
secondary = entry.get('secondary_publication')
if secondary:
secondary = to_datetime(secondary)
entry['publication'] = publication.replace(year=secondary.year)
self.add_values({
k: v for k, v in entry.items() if k in Document.fields
})
class RISScraper(Scraper): class RISScraper(Scraper):
name = 'ris' name = 'ris'
expects = RISResponse expects = RISResponse
def parse(self, response): def parse(self, response):
for entry in response.parse(): for entry in response.parse():
yield entry yield RISDocumentLoader(entry).load()
if self.reach_limit():
return
...@@ -282,3 +282,10 @@ DOWNLOAD_HANDLERS = { ...@@ -282,3 +282,10 @@ DOWNLOAD_HANDLERS = {
} }
DOWNLOAD_DELAY = 0.6 DOWNLOAD_DELAY = 0.6
CONCURRENT_REQUESTS_PER_IP = 8 CONCURRENT_REQUESTS_PER_IP = 8
ITEM_PIPELINES = {
'gargantext.datasource.pipelines.PreprocessPipeline': 100,
'gargantext.datasource.pipelines.LangDetectPipeline': 200,
'gargantext.datasource.pipelines.ValidationPipeline': 300,
'gargantext.datasource.pipelines.DatabasePipeline': 900,
'gargantext.datasource.pipelines.ReportingPipeline': 999,
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment