Commit ca0a852a authored by Castillo's avatar Castillo

new method integrated

parent 808b7a71
......@@ -31,8 +31,8 @@ class Utils:
# a: ( clusterID:string , FIfreq:int , FI:set() )
# b: ( clusterID:string , FIfreq:int , FI:set() )
# ( cID_a , cID_b , FI_a:set() , FI_b:set() )
# x [ 0 , 1 , 2 , 3 ]
def jaccard_idx( self , raw ):
score = 0.0
a = raw[2]
......
......@@ -6,12 +6,12 @@ findspark.init()
from pyspark import SparkContext
# from pyspark import SparkConf
from pyspark.mllib.fpm import FPGrowth
from itertools import combinations
from InterUnion import Utils
import simplejson as json
import os
import datetime
import pprint
import itertools
def lineal_comparisons( years ):
D = {}
......@@ -130,11 +130,11 @@ class Period:
# # # # = = [ / Extracting Frequent Itemsets ] = = = # # #
self.FI_c = self.FI.count()
print("")
print("")
print("----FI----",year)
for i in self.FI.collect():
print( i )
# print("")
# print("")
# print("----FI----",year)
# for i in self.FI.collect():
# print( i )
# # # # = = [ Getting {item,cluster*} ] = = = # # #
......@@ -155,13 +155,13 @@ class Period:
print("")
print("")
print( "K:", self.K ,year)
ress = self.K.collect()
# ress = set( ress )
for i in ress:
print( i )
# # print("")
# # print("")
# # print( "K:", self.K ,year)
# ress = self.K.collect()
# # ress = set( ress )
# for i in ress:
# print( i )
# print( T.take(3) )
print( "\t\t\t",self.FI_c )
......@@ -231,7 +231,7 @@ class Phylo:
if self.yearsD[y[0]].FI_c>0 and self.yearsD[y[1]].FI_c>0:
# print("")
print( "\t",y[0] ,"x", y[1],"...")
rdd_ = self.cartesian_product( y[0] , y[1] ).persist()
rdd_ = self.cartesian_product_idx( y[0] , y[1] ).persist()
rdd_c = rdd_.count()
self.phylomm[ idx ] = {
"rdd_" : rdd_,
......@@ -337,7 +337,7 @@ class Phylo:
def cartesian_product ( self , A , B ):
N = self.yearsD
jaccard_distances = {}
_dists = {}
# A_FI = N[A].FI.filter(lambda x: len(x.items)>=6 and x.freq>=1).map(lambda x: (x.freq , set(x.items)) )#.zipWithIndex().map( lambda x: ( str(A)+"c"+str(x[1]) , x[0][0] , x[0][1] ) ) #.map(lambda x: sorted(x.items) )
# B_FI = N[B].FI.filter(lambda x: len(x.items)>=6 and x.freq>=1).map(lambda x: (x.freq , set(x.items)) )#.zipWithIndex().map( lambda x: ( str(B)+"c"+str(x[1]) , x[0][0] , x[0][1] ) ) #.map(lambda x: sorted(x.items) )
......@@ -349,31 +349,43 @@ class Phylo:
pairs = A_FI.cartesian(B_FI)
do_ = self.utls
jaccard_distances = pairs.map( lambda pair: do_.jaccard( pair[0] , pair[1] ) ).filter( lambda x: x[0]>0 )
_dists = pairs.map( lambda pair: do_.jaccard( pair[0] , pair[1] ) ).filter( lambda x: x[0]>0 )
return jaccard_distances
return _dists
def cartesian_product_idx ( self , A , B ):
N = self.yearsD
jaccard_distances = {}
# A_FI = N[A].FI.filter(lambda x: len(x.items)>=6 and x.freq>=1).map(lambda x: (x.freq , set(x.items)) )#.zipWithIndex().map( lambda x: ( str(A)+"c"+str(x[1]) , x[0][0] , x[0][1] ) ) #.map(lambda x: sorted(x.items) )
# B_FI = N[B].FI.filter(lambda x: len(x.items)>=6 and x.freq>=1).map(lambda x: (x.freq , set(x.items)) )#.zipWithIndex().map( lambda x: ( str(B)+"c"+str(x[1]) , x[0][0] , x[0][1] ) ) #.map(lambda x: sorted(x.items) )
A_FI = N[A].FI.map(lambda x: (x[0] , set(x[1])) )
# for i in A_FI.collect():
# print( i )
B_FI = N[B].FI.map(lambda x: (x[0] , set(x[1])) )
pairs = A_FI.cartesian(B_FI)
_dists = {}
# A_FI = N[A].FI.map(lambda x: (x[0] , set(x[1])) )
# B_FI = N[B].FI.map(lambda x: (x[0] , set(x[1])) )
# pairs = A_FI.cartesian(B_FI)
do_ = self.utls
jaccard_distances = pairs.map( lambda pair: do_.jaccard( pair[0] , pair[1] ) ).filter( lambda x: x[0]>0 )
# _dists = pairs.map( lambda pair: do_.jaccard( pair[0] , pair[1] ) ).filter( lambda x: x[0]>0 )
aa = ddo.yearsD[A].K
bb = ddo.yearsD[B].K
join_terms = aa.join( bb )
cartesian = join_terms.map( lambda x : list(itertools.product( x[1][0] , x[1][1] )) )
cartesian = cartesian.flatMap(lambda x: x).groupBy(lambda x: x).map(lambda x : x[0])
getFIaa = ddo.yearsD[A].FI.collectAsMap()
getFIbb = ddo.yearsD[B].FI.collectAsMap()
pairs = cartesian.map( lambda r : (r[0],r[1],set(getFIaa[r[0]].items), set(getFIbb[r[1]].items) ) )
_dists = pairs.map( lambda x: do_.jaccard_idx( x ) )
_dists = _dists.filter( lambda x: x[0]>0 ).map( lambda x : ( x[0] ,str(A)+"c"+str(x[1]) , str(B)+"c"+str(x[2]) ) )
# for i in _dists.collect():
# print( i )
return jaccard_distances
return _dists
# # instatianting and executing
......
......@@ -135,83 +135,72 @@ def test_newstruct():
FIbb = ddo.yearsD[periods_[1]].FI
# print(periods_[0],":", ddo.yearsD[ periods_[0] ].K)
# for i in aa.collect():
# print( i[0],list(i[1]) )
# print("")
# print(periods_[1],":", ddo.yearsD[ periods_[1] ].K)
# for i in bb.collect():
# print( i[0],list(i[1]) )
# print("")
# intersecciones = aa.join( bb )
# ress = intersecciones.collect()
# # print(periods_[0],":", ddo.yearsD[ periods_[0] ].K)
# # for i in aa.collect():
# # print( i[0],list(i[1]) )
# # print("")
# # print(periods_[1],":", ddo.yearsD[ periods_[1] ].K)
# # for i in bb.collect():
# # print( i[0],list(i[1]) )
# # print("")
# # intersecciones = aa.join( bb )
# # ress = intersecciones.collect()
# # print("")
# # print("")
# # summ = 0
# # print("INTERSECTIONS")
# # for i in ress:
# # elemID = i[0]
# # clustersaa = list(i[1][0])
# # clustersbb = list(i[1][1])
# # print(elemID)
# # print("\t",periods_[0] ,":", clustersaa )
# # print("\t",periods_[1] ,":", clustersbb )
# # # print("\t\t", len(clustersaa) ,"x", len(clustersbb) )
# # mult = len(clustersaa)*len(clustersbb)
# # summ += mult
# # print("\t\t\t", mult )
# # print("")
# # print( "\t","dacha raw:", summ)
# # print("")
# # print("")
# # print( "|FI|",periods_[0],":", len_FIaa)
# # print( "|FI|",periods_[0],":", len_FIbb)
# # print( "\tcartesian product:", len_FIaa*len_FIbb)
# print("")
# print("")
# summ = 0
# print("INTERSECTIONS")
# for i in ress:
# print("----join_terms----")
# elemID = i[0]
join_terms = aa.join( bb )
# ress = join_terms.collect()
# for i in ress:
# print(i)
# clustersaa = list(i[1][0])
# clustersbb = list(i[1][1])
# print(elemID)
# print("\t",periods_[0] ,":", clustersaa )
# print("\t",periods_[1] ,":", clustersbb )
# # print("\t\t", len(clustersaa) ,"x", len(clustersbb) )
# mult = len(clustersaa)*len(clustersbb)
# summ += mult
# print("\t\t\t", mult )
# print("")
# print( "\t","dacha raw:", summ)
# print("")
# print("")
# print( "|FI|",periods_[0],":", len_FIaa)
# print( "|FI|",periods_[0],":", len_FIbb)
# print( "\tcartesian product:", len_FIaa*len_FIbb)
print("")
print("")
print("----join_terms----")
join_terms = aa.join( bb )
ress = join_terms.collect()
for i in ress:
print(i)
clustersaa = list(i[1][0])
clustersbb = list(i[1][1])
print("\t",periods_[0] ,":", clustersaa )
print("\t",periods_[1] ,":", clustersbb )
print("")
cartesian = join_terms.map( lambda x : list(itertools.product( x[1][0] , x[1][1] )) )
cartesian = cartesian.flatMap(lambda x: x).groupBy(lambda x: x).map(lambda x : x[0])
# print(cartesian)
# ress = cartesian.collect()
# for i in ress:
# print(i)
# # comparisons = list(i)
# # print( "\t|AxB|:",len(comparisons) )
# # for j in comparisons:
# # print( "\t\t",j[0],"vs",j[1] )
# # print( "\t\t\t",FIaa.lookup(j[0]) )
# # print( "\t\t\t",FIbb.lookup(j[1]) )
# # print("")
print("")
print("")
print("----cartesian.map----")
# print("")
# print("")
# print("----cartesian.map----")
getFIaa = FIaa.collectAsMap()
getFIbb = FIbb.collectAsMap()
pairs = cartesian.map( lambda r : (r[0],r[1],set(getFIaa[r[0]].items), set(getFIbb[r[1]].items) ) )
for i in pairs.collect():
print(i)
# for i in pairs.collect():
# print(i)
print("")
print("")
print("----jaccard_distances----")
# print("")
# print("")
# print("----jaccard_distances----")
jaccard_distances = pairs.map( lambda x: utls.jaccard_idx( x ) )
jaccard_distances = jaccard_distances.filter( lambda x: x[0]>0 ).map( lambda x : ( x[0] ,str(periods_[0])+"c"+str(x[1]) , str(periods_[1])+"c"+str(x[2]) ) )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment