# Big Data

# Spark környezet

In [1]:
from pyspark import SparkConf, SparkContext

conf = SparkConf()
sc = SparkContext(conf=conf)

# Feladat1

In [2]:
def mod(x):
    return (x, x % 2)

rdd = sc.parallelize(range(1000)).map(mod)\
.filter(lambda x : x[0] % 3 == 0 )\
.take(10)

print(rdd)

[(0, 0), (3, 1), (6, 0), (9, 1), (12, 0), (15, 1), (18, 0), (21, 1), (24, 0), (27, 1)]


# Feladat 2

In [26]:
lines = sc.textFile("C:/Users/Gogo/Documents/BigDataBSc/gyak2/jokt.txt")\
.flatMap(lambda l : l.split(" "))\
.filter(lambda w : w != "")\
.map(lambda w : w.replace(":","").replace(".",""))\
.map(lambda w : (w,1))\
.reduceByKey(lambda a,b: a+b)\
.sortBy(lambda x : x[1], False)\
.take(20)

print(lines)

[('a', 68), ('az', 41), ('K', 17), ('V', 17), ('nem', 15), ('A', 14), ('és', 13), ('hogy', 13), ('otthoni', 13), ('Az', 13), ('vagy', 11), ('kell', 10), ('ELTE', 10), ('14', 8), ('is', 8), ('karanténba', 7), ('de', 7), ('egyetemi', 6), ('előírásai', 6), ('KÉRDÉSEK', 6)]


# Feladat 2 masik megoldás

In [31]:
from functools import reduce

words = sc.textFile("C:/Users/Gogo/Documents/BigDataBSc/gyak2/jokt.txt")\
.flatMap(lambda l : l.split(" "))

words.filter(lambda w : w != "")\
.map(lambda w : w.replace(":","").replace(".",""))\
.map(lambda w : (w,1))\
.groupByKey()\
.map(lambda x : (x[0], reduce(lambda a,b: a+b, x[1]) ))\
.sortBy(lambda x : x[1], False)\
.take(20)

print(lines)

[('a', 68), ('az', 41), ('K', 17), ('V', 17), ('nem', 15), ('A', 14), ('és', 13), ('hogy', 13), ('otthoni', 13), ('Az', 13), ('vagy', 11), ('kell', 10), ('ELTE', 10), ('14', 8), ('is', 8), ('karanténba', 7), ('de', 7), ('egyetemi', 6), ('előírásai', 6), ('KÉRDÉSEK', 6)]


# Feladat 3

In [39]:
res = words.map(lambda x: len(x)).max()
#res = words.map(len).max()

print(res)

23


In [35]:
res = words.map(lambda x : (x, len(x))).reduce(lambda a,b : a if a[1]>b[1] else b)

print(res)

('felfüggesztés/halasztás', 23)


# Spark gyak 2

## accumulator

In [2]:
elte_count = sc.accumulator(0)
virus_count = sc.accumulator(0)
egyetem_count = sc.accumulator(0)

def countWord(s):
    if 'ELTE' in s: elte_count.add(1)
    elif 'virus' in s: virus_count.add(1)
    elif 'egyetem' in s: egyetem_count.add(1)

lines = sc.textFile("C:/Users/Gogo/Documents/BigDataBSc/gyak2/jokt.txt")\
.flatMap(lambda l : l.split(" "))\
.map(lambda w : (w,1))\
.foreach(countWord)

print(elte_count)
print(virus_count)
print(egyetem_count)

10
0
1


## Broadcast

In [21]:
brValue = ['ELTE', 'virus', 'egyetem']

br_list_sc = sc.broadcast(brValue)

def found(s):
    for item in br_list_sc.value:
        if item in s:
            return True
    return False

#.filter(lambda w : w in br_list_sc.value)\
#.filter(found)\
#.filter(lambda w : any(item in w for item in br_list_sc.value))\

lines = sc.textFile("C:/Users/Gogo/Documents/BigDataBSc/gyak2/jokt.txt")\
.flatMap(lambda l : l.split(" "))\
.filter(found)\
.map(lambda w : (w,1))\
.reduceByKey(lambda a,b : a+b)\
.collect()

print(lines)

PythonRDD[52] at RDD at PythonRDD.scala:53


## timing

In [10]:
from math import cos
def taketime(x):
    [cos(j) for j in range(100)]
    return cos(x)

%time taketime(2)

Wall time: 0 ns


-0.4161468365471424

In [11]:
%time rdd1 = sc.parallelize(range(1000000))

Wall time: 13 ms


In [16]:
%time inter = rdd1.map(taketime).cache()

Wall time: 28 ms


In [19]:
%time print('out: ', inter.reduce(lambda x,y: x + y))

out:  -0.28870546796848107
Wall time: 5.2 s


In [18]:
%time print('filter: ', inter.filter(lambda x:x>0).count())

filter:  500000
Wall time: 4.93 s


## toDebugString 

In [30]:
lines = sc.textFile("C:/Users/Gogo/Documents/BigDataBSc/gyak2/jokt.txt")\
.flatMap(lambda l : l.split(" "))\
.filter(found)\
.map(lambda w : (w,1))\
.reduceByKey(lambda a,b : a+b)

print(lines.toDebugString().decode())
print(inter.toDebugString().decode())

(2) PythonRDD[83] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[82] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[81] at partitionBy at <unknown>:0 []
 +-(2) PairwiseRDD[80] at reduceByKey at <ipython-input-30-f1222abbaa8d>:1 []
    |  PythonRDD[79] at reduceByKey at <ipython-input-30-f1222abbaa8d>:1 []
    |  C:/Users/Gogo/Documents/BigDataBSc/gyak2/jokt.txt MapPartitionsRDD[78] at textFile at <unknown>:0 []
    |  C:/Users/Gogo/Documents/BigDataBSc/gyak2/jokt.txt HadoopRDD[77] at textFile at <unknown>:0 []
(4) PythonRDD[42] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |       CachedPartitions: 4; MemorySize: 8.5 MiB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  ParallelCollectionRDD[38] at readRDDFromFile at PythonRDD.scala:262 [Memory Serialized 1x Replicated]


## Pi szamitas

In [36]:
import random
NUM_SAMPLES = 10000

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(range(NUM_SAMPLES)).filter(inside).count()

print("Pi: ",(4.0*count / NUM_SAMPLES))


Pi:  3.1396


## RDD join

In [41]:
x = sc.parallelize([("hadoop",1), ("spark",1)])
y = sc.parallelize([("hadoop",2), ("spark2",2)])

joined = x.join(y).collect()


#print(joined.toDebugString().decode())
print(joined)


[('hadoop', (1, 2))]


## Dataframe

In [42]:
from pyspark.sql import *

spark = SparkSession(sc)

In [44]:
textFile = sc.textFile("C:/Users/Gogo/Documents/BigDataBSc/gyak2/jokt.txt")
df = textFile.map(lambda l : Row(l)).toDF(["line"])
elte_count = df.filter(df["line"].like("%ELTE%"))

print(elte_count.count())

11


In [48]:
valuesA = [('alma',1),('korte',2),('banan',3)]
dfA = spark.createDataFrame(valuesA,['gyumi','db']).cache()

valuesB = [('alma',2),('narancs',4),('banan',5)]
dfB = spark.createDataFrame(valuesB,['gyumi','id']).cache()

dfA.show()
dfB.show()

+-----+---+
|gyumi| db|
+-----+---+
| alma|  1|
|korte|  2|
|banan|  3|
+-----+---+

+-------+---+
|  gyumi| id|
+-------+---+
|   alma|  2|
|narancs|  4|
|  banan|  5|
+-------+---+



In [49]:
inner_join = dfA.join(dfB, dfA.gyumi == dfB.gyumi)
inner_join.show()

+-----+---+-----+---+
|gyumi| db|gyumi| id|
+-----+---+-----+---+
| alma|  1| alma|  2|
|banan|  3|banan|  5|
+-----+---+-----+---+



In [60]:
left_join = dfA.join(dfB, dfA.gyumi == dfB.gyumi, how="full")
left_join.show()
#left_join.filter(~dfA['gyumi'].isNull()).show()


+-----+----+-------+----+
|gyumi|  db|  gyumi|  id|
+-----+----+-------+----+
|banan|   3|  banan|   5|
| null|null|narancs|   4|
| alma|   1|   alma|   2|
|korte|   2|   null|null|
+-----+----+-------+----+



## K-means

In [61]:
import numpy as np

def parseVector(line):
    return np.array([float(x) for x in line.split(' ')])


def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex

lines = sc.textFile("C:/Users/Gogo/Documents/BigDataBSc/gyak5/kmeansInput.txt").map(lambda r: r[0])
data = lines.map(parseVector).cache()
K = 3
convergeDist = 0.01

kPoints = data.takeSample(False, K, 1)
tempDist = 1.0

while tempDist > convergeDist:
    closest = data.map(lambda p: (closestPoint(p, kPoints), (p, 1)))
    pointStats = closest.reduceByKey(lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))
    newPoints = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
    tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)

    for (iK, p) in newPoints:
        kPoints[iK] = p

print("Final centers: " + str(kPoints))

Final centers: [array([0.]), array([9.]), array([9.])]
