Commit 7f14294e authored by Castillo's avatar Castillo

new changes

parent 89c37e1f
...@@ -22,11 +22,11 @@ class Utils: ...@@ -22,11 +22,11 @@ class Utils:
# b: ( clusterID:string , FIfreq:int , FI:set() ) # b: ( clusterID:string , FIfreq:int , FI:set() )
def jaccard( self , a_raw , b_raw ): def jaccard( self , a_raw , b_raw ):
score = 0.0 score = 0.0
a = a_raw[2] a = a_raw[1]
b = b_raw[2] b = b_raw[1]
len_inter = len( a.intersection(b) ) len_inter = len( a.intersection(b) )
if len_inter>0: if len_inter>0:
score = len_inter / len( a.union(b) ) score = len_inter / float(len( a.union(b) ))
return [ score , a_raw[0] , b_raw[0] ] return [ score , a_raw[0] , b_raw[0] ]
def addCompleteSubGraph(self,terms): def addCompleteSubGraph(self,terms):
......
...@@ -29,11 +29,11 @@ def lineal_comparisons( years ): ...@@ -29,11 +29,11 @@ def lineal_comparisons( years ):
class Period: class Period:
def __init__(self , some_sc=False , period=1990 , minsetsize=3 , minsupp=0.0001 , numpart=1 , minfsetsize=4): def __init__(self , some_sc=False , period=1990 , minsetsize=3 , minsupp=0.0001 , numpart=10 , minfsetsize=4):
# self.sc = SparkContext("local","simple app") # self.sc = SparkContext("local","simple app")
self.sc = some_sc self.sc = some_sc
self.utls = Utils() self.utls = Utils()
self.abs_path = "/home/pksm3/Documents/Pubmed_MeSH-sample_txt/YYYY.txt" self.abs_path = "/home/pksm3/Documents/Pubmed_MeSH-chikungunya/YYYY.txt"
self.idx = period self.idx = period
self.minsetsize = minsetsize self.minsetsize = minsetsize
...@@ -43,21 +43,20 @@ class Period: ...@@ -43,21 +43,20 @@ class Period:
self.minfsetsize = minfsetsize self.minfsetsize = minfsetsize
self.FI = "" self.FI = ""
self.FI_c = 0
self.TF = "" self.TF = ""
def test1( self ):
sc = self.sc
a=[1,4,3,5]
a = sc.parallelize(a)
print ( a )
print ( a.take(2) )
def test2( self ): def test2( self ):
sc = self.sc sc = self.sc
logData = sc.textFile( "/var/log/syslog" ) logData = sc.textFile( "/var/log/syslog" )
numAs = logData.filter(lambda s: 'a' in s).count() numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count() numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
print("- - - - - - - - ")
a=[1,4,3,5]
a = sc.parallelize(a)
print ( a )
print ( a.take(2) )
def get_json( self , the_year ): def get_json( self , the_year ):
return self.abs_path.replace( "YYYY" , str(the_year) ) return self.abs_path.replace( "YYYY" , str(the_year) )
...@@ -89,7 +88,9 @@ class Period: ...@@ -89,7 +88,9 @@ class Period:
data = sc.textFile( self.get_json(year) ) data = sc.textFile( self.get_json(year) )
print("fpg year:",year) print("fpg year:",year)
# transactions = data.map( lambda line: list(map(int, line.strip().split(' ')) ) ) # transactions = data.map( lambda line: list(map(int, line.strip().split(' ')) ) )
transactions = data.map( lambda line: [int(x) for x in line.strip().split(' ') if len(x)>=minsetsize ] ) # transactions = data.map( lambda line: [int(x) for x in line.strip().split(' ') if len(x)>=minsetsize ] )
transactions = data.map( lambda line: [int(x) for x in line.strip().split(' ')][1:] ).filter( lambda x: len(x)>=minsetsize )
# saving term frequencies # saving term frequencies
self.TF = transactions.flatMap(lambda xs: [x for x in xs]).map(lambda x: (x, 1)).reduceByKey( lambda v1,v2: v1+v2 ).sortBy(lambda x: x[1]) self.TF = transactions.flatMap(lambda xs: [x for x in xs]).map(lambda x: (x, 1)).reduceByKey( lambda v1,v2: v1+v2 ).sortBy(lambda x: x[1])
...@@ -102,10 +103,16 @@ class Period: ...@@ -102,10 +103,16 @@ class Period:
# for fi in lala: # for fi in lala:
# print(fi) # print(fi)
self.FI = model.freqItemsets().filter(lambda x: len(x.items)>=minfsetsize).persist() # self.FI = model.freqItemsets().filter(lambda x: len(x.items)>=minfsetsize).sortBy(lambda x: x.freq , ascending=False).zipWithIndex().filter(lambda x: x[1]<=10).map( lambda x: ( str(year)+"c"+str(x[1]) , x[0][0] , x[0][1] ) ).persist()
# res = self.FI.collect()
# for i in res: self.FI = model.freqItemsets().filter(lambda x: len(x.items)>=minfsetsize and x.freq>=2).sortBy(lambda x: x.freq , ascending=False).zipWithIndex().filter(lambda x: x[1]<=100).map( lambda x: ( str(year)+"c"+str(x[1]) , x[0][0] , x[0][1] ) ).persist()
# print(year,"->",i)
self.FI_c = self.FI.count()
# print( transactions.take(3) )
# print( "\t\t\t",self.FI.count() )
# print("")
if sexecm: if sexecm:
print("solo-mode: destroying sc") print("solo-mode: destroying sc")
...@@ -114,17 +121,32 @@ class Period: ...@@ -114,17 +121,32 @@ class Period:
class Phylo: class Phylo:
def __init__(self , t=[] , minK=4): def __init__(self , t=[] , minK=4 , minJ=0.0):
self.sc = SparkContext("local","simple app") self.sc = SparkContext("local","simple app")
self.utls = Utils() self.utls = Utils()
self.years = list( range( t[0], t[1]+1 ) ) self.years = list( range( t[0], t[1]+1 ) ) # combinations(self.years, 2)
# self.pairs = combinations(self.years, 2)
self.pairs, self.pairsD = lineal_comparisons( self.years ) self.pairs, self.pairsD = lineal_comparisons( self.years )
self.minsupp = 0.0001 self.minsupp = 0.0001
self.partitions = 1 self.partitions = 1
self.yearsD = {} self.yearsD = {}
self.minK = minK self.minK = minK
self.phylomm = [] self.phylomm = {}
self.minjacc = minJ
def get_atts( self , scn ):
d = {
"sc" : scn ,
"years" : self.years ,
"pairsD" : self.pairsD ,
"minsupp" : self.minsupp ,
"partitions" : self.partitions ,
"yearsD" : sorted(list(self.yearsD.keys())) ,
"minK" : self.minK ,
"phylomm" : [ (key,value["count"]) for key,value in self.phylomm.items() ] ,
"minjacc" : self.minjacc ,
}
return d
def FPG_chain( self ): def FPG_chain( self ):
N = self.yearsD N = self.yearsD
...@@ -133,43 +155,56 @@ class Phylo: ...@@ -133,43 +155,56 @@ class Phylo:
period_ = Period( some_sc=self.sc , period=y ) period_ = Period( some_sc=self.sc , period=y )
period_.FPG_unit() period_.FPG_unit()
N[ y ] = period_ N[ y ] = period_
# for i in sorted(N.keys()):
# print("year:",i)
# # print( "\t",N[i].count() )
# print( "\t||:9 ->",N[i].FI.filter(lambda x: len(x.items)==K and x.freq>=2) )
# print("")
# idx = sorted(N.keys())[0]
# results = N[idx].FI.filter(lambda x: len(x.items)==K and x.freq>=2).collect()
# for fi in results:
# print(fi)
def temp_matching(self , jacc_min , thepairs): def temp_matching(self , thepairs):
print( "temp_matching:",thepairs ) print( "temp_matching:",thepairs )
jacc_distances = [] jacc_distances = []
for y in thepairs: for y in thepairs:
jacc_distances.append( self.cartesian_product( y[0] , y[1] ) ) idx = sorted( [y[0] , y[1]] )
idx = "_".join(list(map(str,idx)))
self.phylomm = self.sc.union( jacc_distances ) if idx not in self.phylomm:
if self.yearsD[y[0]].FI_c>0 and self.yearsD[y[1]].FI_c>0:
# print("")
print( "\t",y[0] ,"x", y[1],"...")
rdd_ = self.cartesian_product( y[0] , y[1] ).persist()
rdd_c = rdd_.count()
self.phylomm[ idx ] = {
"rdd_" : rdd_,
"count": rdd_c
}
print( "\t\t",self.phylomm[ idx ]["count"])
else:
self.phylomm[ idx ] = {
"rdd_" : None,
"count": 0
}
def filter_jaccard(self , jacc_min ):
# print("\tin filter_jaccard!!")
rname = datetime.datetime.now().isoformat()+"" rname = datetime.datetime.now().isoformat()+""
thepath = "OUTPUTS/"+rname thepath = "OUTPUTS/"+rname
from output_2json import PhyloMaker from output_2json import PhyloMaker
lll = PhyloMaker(thepath , thepath , 0.0) lll = PhyloMaker(thepath , thepath , 0.0)
firstfilter = self.phylomm.filter( lambda x: x[0]>jacc_min ) found_distances = []
ress = firstfilter.collect() for idx in self.phylomm:
if self.phylomm[idx]["count"] > 0:
found_distances += self.phylomm[idx]["rdd_"].filter( lambda x: x[0]>=jacc_min ).collect()
# for i in found_distances:
# print(i)
print( "\t",jacc_min,"-> |JACCARD|:",len(found_distances) )
timerange = [ 1982 , 2014 ] timerange = [ 1982 , 2014 ]
phylojson = lll.export_phylo( liens=ress , T=timerange , jacc_min=jacc_min ) phylojson = lll.export_phylo( liens=found_distances , T=timerange , jacc_min=jacc_min )
# print( "|V|:", len(phylojson["nodes"])) print( "|V|:", len(phylojson["nodes"]))
# print( "|E|:", len(phylojson["links"])) print( "|E|:", len(phylojson["links"]))
if not os.path.exists(thepath): if not os.path.exists(thepath):
os.makedirs(thepath) os.makedirs(thepath)
...@@ -179,6 +214,11 @@ class Phylo: ...@@ -179,6 +214,11 @@ class Phylo:
print("http://localhost/Phylo/explorerjs.html?file=data/"+thepath+"/phylo_twjs.json") print("http://localhost/Phylo/explorerjs.html?file=data/"+thepath+"/phylo_twjs.json")
def filter_FIs(self , minfsetsize ):
return ["hola","mundo"]
# p["instname"] (value="blabla") # p["instname"] (value="blabla")
# p["frequency"] (value="1") # p["frequency"] (value="1")
# p["from_"] (value="1983") # p["from_"] (value="1983")
...@@ -213,14 +253,18 @@ class Phylo: ...@@ -213,14 +253,18 @@ class Phylo:
npairs.append( temmmp ) npairs.append( temmmp )
idx = "_".join(list(map(str,temmmp))) idx = "_".join(list(map(str,temmmp)))
if idx not in self.pairsD: if idx not in self.pairsD:
# self.cartesian_product( nyears[i] , nyears[i+1] )
newcomps.append( temmmp ) newcomps.append( temmmp )
except: except:
break break
pass pass
self.temp_matching( p_["minjaccard"] , newcomps) self.temp_matching( newcomps)
self.pairs, self.pairsD = lineal_comparisons( nyears ) self.pairs, self.pairsD = lineal_comparisons( nyears )
print("")
print( "old jacc:", self.minjacc )
print( "new jacc:", p_["minjaccard"] )
self.filter_jaccard ( p_["minjaccard"] )
print("")
return "Hey, that's pretty good" return "Hey, that's pretty good"
...@@ -231,10 +275,13 @@ class Phylo: ...@@ -231,10 +275,13 @@ class Phylo:
N = self.yearsD N = self.yearsD
jaccard_distances = {} jaccard_distances = {}
A_FI = N[A].FI.filter(lambda x: len(x.items)==10 and x.freq>=2).map(lambda x: (x.freq , set(x.items)) ).zipWithIndex().map( lambda x: ( str(A)+"c"+str(x[1]) , x[0][0] , x[0][1] ) ) #.map(lambda x: sorted(x.items) ) # A_FI = N[A].FI.filter(lambda x: len(x.items)>=6 and x.freq>=1).map(lambda x: (x.freq , set(x.items)) )#.zipWithIndex().map( lambda x: ( str(A)+"c"+str(x[1]) , x[0][0] , x[0][1] ) ) #.map(lambda x: sorted(x.items) )
B_FI = N[B].FI.filter(lambda x: len(x.items)==10 and x.freq>=2).map(lambda x: (x.freq , set(x.items)) ).zipWithIndex().map( lambda x: ( str(B)+"c"+str(x[1]) , x[0][0] , x[0][1] ) ) #.map(lambda x: sorted(x.items) ) # B_FI = N[B].FI.filter(lambda x: len(x.items)>=6 and x.freq>=1).map(lambda x: (x.freq , set(x.items)) )#.zipWithIndex().map( lambda x: ( str(B)+"c"+str(x[1]) , x[0][0] , x[0][1] ) ) #.map(lambda x: sorted(x.items) )
A_FI = N[A].FI.map(lambda x: (x[0] , set(x[1])) )
# for i in A_FI.collect():
# print( i )
B_FI = N[B].FI.map(lambda x: (x[0] , set(x[1])) )
pairs = A_FI.cartesian(B_FI) pairs = A_FI.cartesian(B_FI)
do_ = self.utls do_ = self.utls
......
## Phylomemies with Spark
\ No newline at end of file
This diff is collapsed.
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# python main.py -t 0.0.0.0 -p 6969 # python main.py -t 0.0.0.0 -p 6969
# more: https://flask-runner.readthedocs.io/en/latest/ # more: https://flask-runner.readthedocs.io/en/latest/
from flask import Flask, render_template, request , flash from flask import Flask, render_template, request , flash , jsonify
from flask.ext.runner import Runner #*# from flask.ext.runner import Runner #*#
import simplejson as json import simplejson as json
...@@ -14,6 +14,7 @@ csrf = CsrfProtect() ...@@ -14,6 +14,7 @@ csrf = CsrfProtect()
from PhyloSpark import Phylo from PhyloSpark import Phylo
import datetime import datetime
import pprint
app = Flask(__name__) app = Flask(__name__)
app.secret_key = 's3cr3t' app.secret_key = 's3cr3t'
...@@ -40,6 +41,28 @@ def flash_errors(form): ...@@ -40,6 +41,28 @@ def flash_errors(form):
error error
)) ))
def coolmetadata( OBJ ):
to_show = {}
kys = list(OBJ.__dict__.keys())
for i in kys:
try:
json.dumps( OBJ.__dict__[i] )
print( i , type(OBJ.__dict__[i]) )
to_show[ i ] = OBJ.__dict__[i]
except:
try:
print( i , type(OBJ.__dict__[i]) , "Danger1" , sorted(list(OBJ.__dict__[i].keys())) )
to_show[ i ] = sorted(list(OBJ.__dict__[i].keys()))
except:
print( i , type(OBJ.__dict__[i]) , "Danger2" )
# if "SparkContext" not in str(type(OBJ.__dict__[i])):
# coolmetadata(OBJ.__dict__[i])
# print("")
pass
return to_show
# p_["instname"] (value="blabla") # p_["instname"] (value="blabla")
...@@ -57,19 +80,20 @@ def form_example(): ...@@ -57,19 +80,20 @@ def form_example():
sID = p_["instname"] sID = p_["instname"]
if sID not in I: if sID not in I:
I[ sID ] = Phylo( t=[ p_["from_"] , p_["to_"] ] ) I[ sID ] = Phylo( t=[ p_["from_"] , p_["to_"] ] , minJ=p_["minjaccard"])
# executes a fp-growth per year # executes a fp-growth per year
I[ sID ].FPG_chain() I[ sID ].FPG_chain()
# pairs of years to be multiplied # pairs of years to be multiplied
npairs = I[ sID ].pairs npairs = I[ sID ].pairs
I[ sID ].temp_matching( jacc_min=p_["minjaccard"] , thepairs=npairs ) I[ sID ].temp_matching( thepairs=npairs )
I[ sID ].filter_jaccard ( jacc_min=p_["minjaccard"] )
else: else:
results = I[ sID ].partial_compute( p_ ) results = I[ sID ].partial_compute( p_ )
print("\nYour \"",sID,"\" instance has been modified.") print("\nYour \"",sID,"\" instance has been modified.")
print(p_,"\n") # print(p_,"\n")
else: else:
flash_errors(form) flash_errors(form)
...@@ -86,10 +110,17 @@ def close_contexts(): ...@@ -86,10 +110,17 @@ def close_contexts():
for i in I: for i in I:
del I[i] del I[i]
@app.route('/') @app.route('/api/context/<contextID>', methods=("GET",))
def hello_world(): def get_context(contextID=None):
print ("hello world")
return 'Hello, World!' results = ["null"]
if contextID in I:
to_show = I[contextID].get_atts( scn=contextID )
return jsonify( to_show )
return jsonify( results )
@app.route('/phylo/all') @app.route('/phylo/all')
......
...@@ -6,9 +6,12 @@ import pprint ...@@ -6,9 +6,12 @@ import pprint
TEST_CASE = -1 TEST_CASE = -1
if len(sys.argv)>1 and sys.argv[1]!=None: TEST_CASE = sys.argv[1] if len(sys.argv)>1 and sys.argv[1]!=None: TEST_CASE = sys.argv[1]
THREADS = 16
def lala(): def lala():
from example import Phylo from PhyloSpark import Phylo
lala = Phylo( t=[1983,1984] ) lala = Phylo( t=[1983,1984] )
lala.FPG_chain() lala.FPG_chain()
results = lala.cartesian_product( 1983 , 1984 ) results = lala.cartesian_product( 1983 , 1984 )
...@@ -16,7 +19,7 @@ def lala(): ...@@ -16,7 +19,7 @@ def lala():
# define the function blocks # define the function blocks
def test_cartesianproduct(): def test_cartesianproduct():
from example import Phylo from PhyloSpark import Phylo
lala = Phylo( t=[1983,1984] ) lala = Phylo( t=[1983,1984] )
lala.FPG_chain() lala.FPG_chain()
results = lala.cartesian_product( 1983 , 1984 ).collect() results = lala.cartesian_product( 1983 , 1984 ).collect()
...@@ -49,14 +52,63 @@ def prime(): ...@@ -49,14 +52,63 @@ def prime():
print ("n is a prime number\n") print ("n is a prime number\n")
def interRDD():
P = "/home/pksm3/These/DATA/Pubmed-filter_txt_pmid-meshID/2010.json.txt"
WL = "/home/pksm3/Downloads/whitelist.txt"
from PhyloSpark import Phylo
the_ = Phylo( t=[1983,1984] )
T = the_.sc.textFile( P ).map( lambda line: [int(x) for x in line.strip().split(' ') if len(x)>=3 ] )
SL = the_.sc.textFile( WL ).map( lambda line: int(line.strip()) )
IDs = T.map(lambda x : x[0])
ress = SL.intersection( IDs ).collect()
print(ress)
def processLine( line ):
array = list(map(int,line.strip().split(' ')))
return (array[0],array[1:]);
def getWL ( sc_ , query ):
return sc_.textFile( query ).map( lambda line: (int(line.strip()),1) )
def interDataSet( sc_ , period , WL ):
P = "/home/pksm3/These/DATA/Pubmed-filter_txt_pmid-meshID/"+period+".json.txt"
# P = sc_.textFile( P ).map( lambda line: [int(x) for x in line.strip().split(' ') if len(x)>=4 ] ).map(lambda x : (x[0], list(x[1:]))).groupByKey()
P = sc_.textFile( P ).map( lambda line: processLine( line ) ).filter( lambda x: len(x[1])>=3 )#.reduceByKey(lambda x,y: x+y )#.groupByKey()
intersecciones = P.join( WL ).map(lambda x: (x[0],x[1][0])).collect()
# ress = P.map( lambda x : x if x[0] in intersecciones else None ).filter( lambda x: x!=None ).map(lambda x : str(x[0])+" ".join(list(map(str,x[1]))) )
# ress.saveAsTextFile( period )
def test_workflow():
from PhyloSpark import Phylo
the_ = Phylo( t=[1983,1984] )
WL = getWL( the_.sc , "/home/pksm3/Downloads/whitelist.txt" )
for i in range(1983,2015):
period = str(i)
interDataSet( the_.sc , period , WL )
DO_ = { DO_ = {
"cartesian-print" : test_cartesianproduct, "cartesian-print" : test_cartesianproduct,
"read-jaccs" : reading_jaccs, "read-jaccs" : reading_jaccs,
"lala" : lala, "lala" : lala,
"3" : prime, "interrdd" : interRDD,
"5" : prime, "interdataset" : test_workflow,
"7" : prime, "7" : prime,
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment