Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crawling of stanford dataspace, and simple indexes #11

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions datalad_crawler/nodes/crawl_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class crawl_url(object):

"""
def __init__(self,
url=None, matchers=None,
url=None,
matchers=None,
input='url',
failed=None,
cache_redirects=True,
Expand Down Expand Up @@ -64,7 +65,8 @@ def _visit_url(self, url, data):
return
# this is just a cruel first attempt
lgr.debug("Visiting %s" % url)

if 'initial_url' not in data:
data['initial_url'] = url
try:
retry = 0
orig_url = url
Expand Down Expand Up @@ -96,12 +98,12 @@ def _visit_url(self, url, data):

data_ = updated(data, zip(self._output, (page, url)))
yield data_

# now recurse if matchers were provided
matchers = self._matchers
if matchers:
lgr.debug("Looking for more URLs at %s using %s", url, matchers)
for matcher in (matchers if isinstance(matchers, (list, tuple)) else [matchers]):
matchers = matchers if isinstance(matchers, (list, tuple)) else [matchers]
for matcher in matchers:
for data_matched in matcher(data_):
if 'url' not in data_matched:
lgr.warning("Got data without a url from %s" % matcher)
Expand Down
51 changes: 41 additions & 10 deletions datalad_crawler/nodes/matches.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,26 @@ class ExtractorMatch(object):
"""Generic matching extractor
"""
def __init__(self, query, input='response', output='match', pop_input=False,
allow_multiple=False, xpaths=None, csss=None, min_count=None,
max_count=None):
""""""
allow_multiple=False, xpaths=None, csss=None,
min_count=None, max_count=None, finalize_each=True):
"""

Parameters
----------
query:
input:
output:
pop_input:
allow_multiple:
xpaths:
csss:
min_count:
max_count:
finalize_each: bool, optional
If set to True, it would call finalize upon each call of the node,
so min/max_count checks would be done per each call, instead of for
the entire duration of node use
"""
# TODO: define arguments
self.query = query
# allow_multiple concerns only extraction of additional xpaths and csss
Expand All @@ -55,11 +72,15 @@ def __init__(self, query, input='response', output='match', pop_input=False,
self._pop_input = False
self._min_count = min_count
self._max_count = max_count
self._finalize_each = finalize_each
self.count = 0
self._finalized = True

def _select_and_extract(self, selector, query, data): # pragma: no cover
raise NotImplementedError

def __call__(self, data):
self._finalized = False
input = data.pop(self._input) if self._pop_input else data[self._input]

if not Response:
Expand All @@ -76,7 +97,9 @@ def __call__(self, data):
else:
selector = Selector(text=input)

count = 0
if 'url' in data:
# store URL for the page from where the match has happened
data['listing_url'] = data['url']
for entry, data_ in self._select_and_extract(selector, self.query, data):
data_ = updated(data_, {self._output: entry.extract()})
# now get associated xpaths, css, etc
Expand All @@ -95,22 +118,30 @@ def __call__(self, data):
data_[key] = key_extracted
# raise NotImplementedError("Don't know what to do yet with this one")
else:
lgr.warn(
lgr.warning(
"Got multiple selections for xpath query %s. "
"Keeping only the first one: %s" % (repr(selector_), key_extracted[0]))
data_[key] = key_extracted[0]
else:
data_[key] = key_extracted[0]
count += 1
self.count += 1
yield data_
if self._finalize_each:
self.finalize()

if self._min_count and count < self._min_count:
def finalize(self):
if self._finalized: # already done
return
if self._min_count and self.count < self._min_count:
raise ValueError("Did not match required %d matches (got %d) using %s"
% (self._min_count, count, self))
% (self._min_count, self.count, self))

if self._max_count and count > self._max_count:
if self._max_count and self.count > self._max_count:
raise ValueError("Matched more than %d matches (got %d) using %s"
% (self._max_count, count, self))
% (self._max_count, self.count, self))
self.count = 0
self._finalized = True



class ScrapyExtractorMatch(ExtractorMatch):
Expand Down
22 changes: 13 additions & 9 deletions datalad_crawler/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,19 @@ class FinishPipeline(Exception):
PIPELINE_TYPES = (list, tuple)


def reset_pipeline(pipeline):
"""Given a pipeline, traverse its nodes and call .reset on them
def _call_recursively(pipeline, method_name):
"""Given a pipeline, traverse its nodes and call their method

Note: it doesn't try to call reset if a node doesn't have it
Note: it doesn't try to call method if a node doesn't have it
"""
if pipeline:
for node in pipeline:
method = getattr(node, method_name, None)
if isinstance(node, PIPELINE_TYPES):
reset_pipeline(node)
elif hasattr(node, '__call__') and hasattr(node, 'reset'):
lgr.log(2, "Resetting node %s" % node)
node.reset()
_call_recursively(node, method_name)
elif method and hasattr(node, '__call__'):
lgr.log(2, "Calling %s of node %s", method_name, node)
method()


def run_pipeline(*args, **kwargs):
Expand Down Expand Up @@ -136,7 +137,7 @@ def _get_pipeline_opts(pipeline):
return opts, pipeline


def xrun_pipeline(pipeline, data=None, stats=None, reset=True):
def xrun_pipeline(pipeline, data=None, stats=None, reset=True, finalize=True):
"""Yield results from the pipeline.

"""
Expand All @@ -150,7 +151,7 @@ def _log(msg, *args):

if reset:
_log("Resetting pipeline")
reset_pipeline(pipeline)
_call_recursively(pipeline, 'reset')

# just for paranoids and PEP8-disturbed, since theoretically every node
# should not change the data, so having default {} should be sufficient
Expand Down Expand Up @@ -209,6 +210,9 @@ def _log(msg, *args):
# or all subsequent or may be go back and only skip that generated result
_log("got a signal that pipeline is 'finished'")

if finalize:
_log("Finalizing pipeline")
_call_recursively(pipeline, 'finalize')
# TODO: this implementation is somewhat bad since all the output logic is
# duplicated within xrun_pipeline_steps, but it is probably unavoidable because of
# loop option
Expand Down
96 changes: 84 additions & 12 deletions datalad_crawler/pipelines/simple_with_archives.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,100 @@
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""A pipeline for crawling a crcns dataset"""
"""A pipeline for crawling basic websites and annexing their content

It features:
- extraction of content from archives (see `a_href_match_` option)
- establishing 3-branch workflow with following branches:
- incoming - where all content is downloaded as is
- incoming-processed - where archives (if present) are extracted
(automatically) based on the content of incoming branch
- master - where incoming-processed is merged, so you could introduce
your additions/changes/fixups to files so they would later be automatically
merged with changes from the web which would propagate via
incoming-processed branch
"""

# Import necessary nodes
from ..nodes.crawl_url import crawl_url
from ..nodes.misc import fix_url
from ..nodes.matches import a_href_match
from ..nodes.matches import (
a_href_match,
a_text_match,
)
from ..nodes.misc import find_files
from ..nodes.misc import sub
from ..nodes.annex import Annexificator
from datalad.utils import assure_bool
from datalad_crawler.consts import DATALAD_SPECIAL_REMOTE, ARCHIVES_SPECIAL_REMOTE
from datalad.support.strings import get_replacement_dict

# Possibly instantiate a logger if you would like to log
# during pipeline creation
from logging import getLogger
lgr = getLogger("datalad.crawler.pipelines.kaggle")
lgr = getLogger("datalad.crawler.pipelines.simple_with_archives")


def pipeline(url=None,
# parameters for initial crawling
a_href_match_='.*/download/.*\.(tgz|tar.*|zip)',
tarballs=True,
datalad_downloader=False,
use_current_dir=False,
leading_dirs_depth=1,
a_href_follow=None,
a_text_follow=None,
rename=None,
backend='MD5E',
# how do we establish the path while crawling following the links
path_buildup=None, # 'relurl' - relative to initial url, ?'a_text' - could construct from the "navigation bread crumbs"
# working with tarballs:
tarballs=None, # deprecated
fail_if_no_archives=True,
add_archive_leading_dir=False,
use_current_dir=False,
leading_dirs_depth=1,
archives_regex="\.(zip|tgz|tar(\..+)?)$",
# heavy customizations so this could be used from other pipelines
datalad_downloader=False,
annex=None,
add_annex_to_incoming_pipeline=False,
incoming_pipeline=None):
incoming_pipeline=None,
# Additional repo specs
backend='MD5E'
):
"""Pipeline to crawl/annex a simple web page with some tarballs on it

If .gitattributes file in the repository already provides largefiles
setting, none would be provided here to calls to git-annex. But if not --
README* and LICENSE* files will be added to git, while the rest to annex

Parameters
----------
url : str
Top URL to crawl
a_href_match_: str, optional
Regular expression for HTML A href option to match to signal which files
to download
a_href_follow: str, optional
Regular expression for HTML A href option to follow/recurse into to look
for more URLs
a_text_follow: str, optional
Regular expression for HTML A text content to follow/recurse into to look
for more URLs
tarballs: bool, optional
old, use `fail_if_no_archives`
fail_if_no_archives: bool, optional
Fail if no archives were found
archives_regex: str, optional
Regular expression to define what files are archives and should be
extracted
path_buildup: (None, 'relpath')
If not None, directs how to establish a path for the file out of url.
`relpath` - relative to the initial url
"""

if tarballs is not None:
# compatibility
fail_if_no_archives = tarballs
# This is messy and non-uniform ATM :-/ TODO: use constraints etc for typing of options?
fail_if_no_archives = assure_bool(fail_if_no_archives)

if not isinstance(leading_dirs_depth, int):
leading_dirs_depth = int(leading_dirs_depth)

Expand All @@ -64,12 +121,26 @@ def pipeline(url=None,

if url:
assert not incoming_pipeline
crawler = crawl_url(url)

follow_matchers = []
if a_href_follow:
follow_matchers.append(a_href_match(a_href_follow))
if a_text_follow:
follow_matchers.append(a_text_match(a_text_follow))

crawler = crawl_url(url, matchers=follow_matchers)

from datalad_crawler.nodes.misc import debug
incoming_pipeline = [ # Download all the archives found on the project page
crawler,
a_href_match(a_href_match_, min_count=1),
a_href_match(a_href_match_, min_count=1, finalize_each=False),
fix_url,
]
if path_buildup == 'relpath':
incoming_pipeline += [

]

if rename:
incoming_pipeline += [sub({'filename': get_replacement_dict(rename)})]
incoming_pipeline += [annex]
Expand All @@ -81,6 +152,7 @@ def pipeline(url=None,


# TODO: we could just extract archives processing setup into a separate pipeline template

return [
annex.switch_branch('incoming', parent='master'),
[
Expand All @@ -90,7 +162,7 @@ def pipeline(url=None,
[ # nested pipeline so we could skip it entirely if nothing new to be merged
annex.merge_branch('incoming', strategy='theirs', commit=False), #, skip_no_changes=False),
[ # Pipeline to augment content of the incoming and commit it to master
find_files("\.(zip|tgz|tar(\..+)?)$", fail_if_none=tarballs), # So we fail if none found -- there must be some! ;)),
find_files(archives_regex, fail_if_none=fail_if_no_archives), # So we fail if none found -- there must be some! ;)),
annex.add_archive_content(
existing='archive-suffix',
# Since inconsistent and seems in many cases no leading dirs to strip, keep them as provided
Expand Down
40 changes: 40 additions & 0 deletions datalad_crawler/pipelines/tests/test_simple_with_archives.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,43 @@ def test_crawl_autoaddtext(ind, topurl, outd):
ok_file_under_git(outd, "anothertext", annexed=False)
ok_file_under_git(outd, "d/textfile", annexed=False)
ok_file_under_git(outd, "d/tooshort", annexed=True)


@with_tree(tree={
'index.html': """<html><body>
<a href="d1/">sub1</a>
<a href="d2/">sub2</a>
</body></html>
""",
'd1': {
'1.dat': ''
},
'd2': {
'2.dat': 'text'
}})
@serve_path_via_http
@with_tempfile
@known_failure_direct_mode #FIXME
def test_recurse_follow(ind, topurl, outd):
ds = create(outd, text_no_annex=True)
def crawl_init_(**kw):
return crawl_init(
dict(url=topurl, a_href_match_='.*\.dat', fail_if_no_archives=False, **kw)
, save=True
, template='simple_with_archives'
)

with chpwd(outd): # TODO -- dataset argument
crawl_init_()
# nothing is matched so it would blow somehow
with assert_raises(Exception):
crawl()

crawl_init_(a_href_follow='.*/d[1]/?')
crawl()

ok_clean_git(outd)
ok_file_under_git(outd, "d1/1.dat", annexed=True)
#ok_file_under_git(outd, "d/textfile", annexed=False)
#ok_file_under_git(outd, "d/tooshort", annexed=True)