Commit a5fc141c authored by Mathieu Rodic's avatar Mathieu Rodic

[FEAT] ngrams are parsed when a corpus is uploaded

(yeeehhhah!)
parent edf1f157
......@@ -6,6 +6,7 @@ sudo pip3 install virtualenv
sudo apt-get install rabbitmq-server
virtualenv-3.4 VENV
source VENV/bin/activate
pip install git+https://github.com/zzzeek/sqlalchemy.git@rel_1_1
pip install -U -r requirements.txt
```
......
......@@ -7,3 +7,15 @@
## Single project view
- re-implement deletion
# Taggers
Path for data used by taggers should be defined in `gargantext.constants`.
# Database
## Bulk insertion
The replacement of spaces should be more elegant.
......@@ -9,12 +9,18 @@ NODETYPES = [
'DOCUMENT',
]
from gargantext.util.taggers import *
LANGUAGES = {
'fr': {
# 'tagger': FrenchNgramsTagger
},
'en': {
# 'tagger': EnglishNgramsTagger
'tagger': TurboTagger,
# 'tagger': EnglishMeltTagger,
# 'tagger': NltkTagger,
},
'fr': {
'tagger': FrenchMeltTagger,
# 'tagger': TreeTagger,
},
}
......
from .nodes import *
from .users import *
from .ngrams import *
from gargantext.util.db import *
from gargantext.util.files import upload
from gargantext.constants import *
from .nodes import Node
__all__ = ['Ngram', 'NodeNgram']
class Ngram(Base):
__tablename__ = 'ngrams'
id = Column(Integer, primary_key=True)
terms = Column(String(255), unique=True)
n = Column(Integer)
class NodeNgram(Base):
__tablename__ = 'nodes_ngrams'
id = Column(Integer)
node_id = Column(Integer, ForeignKey(Node.id), primary_key=True)
ngram_id = Column(Integer, ForeignKey(Ngram.id), primary_key=True)
weight = Column(Float)
......@@ -30,12 +30,13 @@ class Node(Base):
# main data
name = Column(String(255))
date = Column(DateTime(), default=datetime.now)
# metadata
hyperdata = Column(JSONB, default={})
# metadata (see https://bashelton.com/2014/03/updating-postgresql-json-fields-via-sqlalchemy/)
hyperdata = Column(JSONB, default=dict)
def __init__(self, **kwargs):
if 'hyperdata' not in kwargs:
kwargs['hyperdata'] = kwargs.get('hyperdata', {})
Base.__init__(self, **kwargs)
self.hyperdata = {}
def __getitem__(self, key):
return self.hyperdata[key]
......@@ -43,6 +44,22 @@ class Node(Base):
def __setitem__(self, key, value):
self.hyperdata[key] = value
def save_hyperdata(self):
"""This is a necessary, yet ugly trick.
Indeed, PostgreSQL does not yet manage incremental updates (see
https://bashelton.com/2014/03/updating-postgresql-json-fields-via-sqlalchemy/)
"""
from sqlalchemy.orm.attributes import flag_modified
flag_modified(self, 'hyperdata')
# # previous trick (even super-uglier)
# hyperdata = self.hyperdata
# self.hyperdata = None
# session.add(self)
# session.commit()
# self.hyperdata = hyperdata
# session.add(self)
# session.commit()
def children(self, typename=None):
"""Return a query to all the direct children of the current node.
Allows filtering by typename (see `constants.py`)
......@@ -63,26 +80,22 @@ class Node(Base):
def resources(self):
if 'resources' not in self.hyperdata:
self.hyperdata['resources'] = []
self['resources'] = MutableList()
return self['resources']
def add_resource(self, type, path=None, url=None):
self.resources().append({'type': type, 'path':path, 'url':url})
self.resources().append(MutableDict(
{'type': type, 'path':path, 'url':url, 'extracted': False}
))
def status(self, action=None, progress=None, autocommit=False):
def status(self, action=None, progress=None):
if 'status' not in self.hyperdata:
self['status'] = {'action': action, 'progress': progress}
self['status'] = MutableDict(
{'action': action, 'progress': progress}
)
else:
if action is not None:
self['status']['action'] = action
if progress is not None:
self['status']['progress'] = progress
if autocommit:
hyperdata = self.hyperdata.copy()
self.hyperdata = None
session.add(self)
session.commit()
self.hyperdata = hyperdata
session.add(self)
session.commit()
return self['status']
......@@ -36,7 +36,7 @@ djcelery.setup_loader()
BROKER_URL = 'amqp://guest:guest@localhost:5672/'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
CELERY_IMPORTS = (
'gargantext.util.workflow',
'gargantext.util.toolchain',
# 'gargantext.models',
# 'gargantext.util.db',
)
......
......@@ -51,11 +51,11 @@ def get_cursor():
class bulk_insert:
def __init__(self, table, keys, data, cursor=None):
def __init__(self, table, fields, data, cursor=None):
# prepare the iterator
self.iter = iter(data)
# template
self.template = '%s' + (len(keys) - 1) * '\t%s' + '\n'
self.template = '%s' + (len(fields) - 1) * '\t%s' + '\n'
# prepare the cursor
if cursor is None:
db, cursor = get_cursor()
......@@ -65,7 +65,7 @@ class bulk_insert:
# insert data
if not isinstance(table, str):
table = table.__tablename__
cursor.copy_from(self, table, columns=keys)
cursor.copy_from(self, table, columns=fields)
# commit if necessary
if mustcommit:
db.commit()
......@@ -81,15 +81,19 @@ class bulk_insert:
readline = read
def bulk_insert_ifnotexists(model, uniquekey, fields, values):
db, cursor = get_cursor()
# create temporary table with given values
def bulk_insert_ifnotexists(model, uniquekey, fields, data, cursor=None):
if cursor is None:
db, cursor = get_cursor()
mustcommit = True
else:
mustcommit = False
# create temporary table with given data
sql_columns = 'id INTEGER'
for field in fields:
column = getattr(model, field)
sql_columns += ', %s %s' % (field, column.type, )
cursor.execute('CREATE TEMPORARY TABLE __tmp__ (%s)' % (sql_columns, ))
bulk_insert('__tmp__', fields, values, cursor=cursor)
bulk_insert('__tmp__', fields, data, cursor=cursor)
# update ids of the temporary table
cursor.execute('''
UPDATE __tmp__
......@@ -124,5 +128,6 @@ def bulk_insert_ifnotexists(model, uniquekey, fields, values):
row[1]: row[0] for row in cursor.fetchall()
}
# this is the end!
db.commit()
if mustcommit:
db.commit()
return result
from gargantext.util.languages import languages
from gargantext.constants import LANGUAGES
import nltk
import re
class NgramsExtractor:
def __init__(self, tagger):
self._tagger = tagger()
@staticmethod
def clean_text(text):
"""Clean the text for better POS tagging.
For now, only removes (short) XML tags.
"""
return re.sub(r'<[^>]{0,45}>', '', text)
def extract(self, text, rule='{<JJ.*>*<NN.*>+<JJ.*>*}', label='NP'):
text = self.clean_text(text)
grammar = nltk.RegexpParser(label + ': ' + rule)
tagged_tokens = list(self._tagger.tag_text(text))
if len(tagged_tokens):
grammar_parsed = grammar.parse(tagged_tokens)
for subtree in grammar_parsed.subtrees():
if subtree.label() == label:
yield subtree.leaves()
class NgramsExtractors(dict):
def __missing__(self, key):
if len(key) == 2 and key == key.lower():
tagger = LANGUAGES[key]['tagger']
self[key] = NgramsExtractor(tagger)
else:
self[key] = self[LANGUAGES[key].iso3]
return self[key]
# this below will be shared within the current thread
ngramsextractors = NgramsExtractors()
"""This module defines three distinct decorators for scheduling.
- `scheduled_now` is only there for debugging purpose: the decorated method
is executed as is
- `scheduled_thread` starts the decorated method as a new thread, but does not
really "follow" it
- `scheduled_celery` ensures tasks management via Celery, but is preferable not
to use while in debugging mode
Note that it is strongly discouraged to use database objects (model instances,
etc.) as parameters of methods decorated with those decorators.
Prefer using built-in types, such as `float`, `str`, `dict` (for a complete
list, see https://docs.python.org/3/library/stdtypes.html).
"""
def scheduled_now(func):
"""Provides a decorator to execute the task right away.
Mostly useful for debugging purpose.
......@@ -11,7 +24,7 @@ def scheduled_now(func):
import threading
def scheduled_thread(func):
"""Provides a decorator to schedule a task as a new thread.
Problem: a shutdown may lose the task forever...
Problem: an unexpected shutdown may lose the task forever...
"""
def go(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
......
......@@ -42,11 +42,12 @@ def _readOutput(output, buffer):
time.sleep(0.1)
"""Use TreeTagger for the tagging.
Shall be used for french texts.
"""
class TreeTagger(Tagger):
"""Use TreeTagger for the tagging.
Shall be used for french texts.
"""
def start(self, treeTaggerPath='./lib/treetagger'):
print(treeTaggerPath)
if treeTaggerPath[0] == '.':
......
from .parsing import parse
from .ngrams_extraction import extract_ngrams
from gargantext.util.db import session
from gargantext.models import Node
def parse_extract(corpus):
# retrieve corpus from database from id
if isinstance(corpus, int):
corpus_id = corpus
corpus = session.query(Node).filter(Node.id == corpus_id).first()
if corpus is None:
print('NO SUCH CORPUS: #%d' % corpus_id)
return
# apply actions
parse(corpus)
extract_ngrams(corpus)
from gargantext.util.db import *
from gargantext.models import *
from gargantext.constants import *
from gargantext.util.ngramsextractors import ngramsextractors
from collections import defaultdict
def extract_ngrams(corpus, rule='{<JJ.*>*<NN.*>+<JJ.*>*}', keys=('title', 'abstract', )):
"""Extract ngrams for every document below the given corpus.
Default language is given by the resource type.
The result is then inserted into database.
Only fields indicated in `keys` are tagged.
"""
db, cursor = get_cursor()
nodes_ngrams_count = defaultdict(int)
ngrams_data = set()
# extract ngrams
resource_type_index = corpus.resources()[0]['type']
resource_type = RESOURCETYPES[resource_type_index]
default_language_iso2 = resource_type['default_language']
for document in corpus.children('DOCUMENT'):
for key in keys:
value = document.hyperdata.get(key, '')
if len(value) == 0:
continue
# get ngrams
language_iso2 = document.hyperdata.get('language_iso2', default_language_iso2)
ngramsextractor = ngramsextractors[language_iso2]
for ngram in ngramsextractor.extract(value):
tokens = tuple(token[0] for token in ngram)
terms = ' '.join(tokens)
nodes_ngrams_count[(document.id, terms)] += 1
ngrams_data.add((terms[:255], len(tokens), ))
# integrate ngrams
ngrams_ids = bulk_insert_ifnotexists(
model = Ngram,
uniquekey = 'terms',
fields = ('terms', 'n'),
data = ngrams_data,
cursor = cursor,
)
db.commit()
# integrate node-ngram associations
nodes_ngrams_data = tuple(
(node_ngram[0], ngrams_ids[node_ngram[1]], count)
for node_ngram, count in nodes_ngrams_count.items()
)
bulk_insert(
table = NodeNgram,
fields = ('node_id', 'ngram_id', 'weight'),
data = nodes_ngrams_data,
cursor = cursor,
)
db.commit()
# the end!
from gargantext.util.db import *
from gargantext.models import *
from gargantext.util.scheduling import scheduled
from gargantext.constants import *
@scheduled
def parse(corpus_id):
# retrieve corpus from database
corpus = session.query(Node).filter(Node.id == corpus_id).first()
if corpus is None:
print('NO SUCH CORPUS: #%d' % corpus_id)
return
def parse(corpus):
# retrieve resource information
documents_count = 0
for resource in corpus['resources']:
for resource in corpus.resources():
# information about the resource
if resource['extracted']:
continue
resource_parser = RESOURCETYPES[resource['type']]['parser']
resource_path = resource['path']
# extract and insert documents from corpus resource into database
......@@ -27,8 +21,12 @@ def parse(corpus_id):
)
session.add(document)
if documents_count % 64 == 0:
corpus.status(action='parsing', progress=documents_count, autocommit=True)
corpus.status(action='parsing', progress=documents_count)
corpus.save_hyperdata()
documents_count += 1
# update info about the resource
resource['extracted'] = True
corpus.save_hyperdata()
# commit all changes
corpus.status(action='parsing', progress=documents_count)
session.commit()
from gargantext.util import workflow
from gargantext.util.http import *
from gargantext.util.db import *
from gargantext.util.db_cache import cache
......@@ -6,6 +5,9 @@ from gargantext.util.files import upload
from gargantext.models import *
from gargantext.constants import *
from gargantext.util.scheduling import scheduled
from gargantext.util.toolchain import parse_extract
from datetime import datetime
from collections import defaultdict
import re
......@@ -92,7 +94,8 @@ def project(request, project_id):
)
session.add(corpus)
session.commit()
workflow.parse(corpus.id)
parse_extract
scheduled(parse_extract)(corpus.id)
# corpora within this project
corpora = project.children('CORPUS').all()
......
Django==1.9.2
PyYAML==3.11
RandomWords==0.1.12
SQLAlchemy==1.0.11
SQLAlchemy==1.1.0b1dev
amqp==1.4.9
anyjson==0.3.3
billiard==3.3.0.22
......@@ -15,6 +15,7 @@ jdatetime==1.7.2
kombu==3.0.33
lxml==3.5.0
nltk==3.1
numpy==1.10.4
psycopg2==2.6.1
pycountry==1.20
python-dateutil==2.4.2
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment