In [1]:
from pyspark.sql import *

spark = SparkSession.builder.getOrCreate()

In [2]:
df = spark.read.format('csv').load('dolgozo.csv')

In [3]:
df.show(5)

+----+-----+-----------+------+---------+-------+-------+-----+
| _c0|  _c1|        _c2|   _c3|      _c4|    _c5|    _c6|  _c7|
+----+-----+-----------+------+---------+-------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
|7839| KING|  PRESIDENT|  0000|81-NOV-17|   5000|      0|   10|
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|
|7782|CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7566|JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|
+----+-----+-----------+------+---------+-------+-------+-----+
only showing top 5 rows



In [4]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [5]:
df = spark.read.format('csv')\
.option('header',True)\
.option('inferschema',True)\
.load('dolgozo.csv')

In [6]:
df.show(5)

+----+------+-----------+------+---------+-------+-------+-----+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+------+-----------+------+---------+-------+-------+-----+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|
|7698| BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|
+----+------+-----------+------+---------+-------+-------+-----+
only showing top 5 rows



In [7]:
df.printSchema()

root
 |-- DKOD: integer (nullable = true)
 |-- DNEV: string (nullable = true)
 |-- FOGLALKOZAS: string (nullable = true)
 |-- FONOKE: integer (nullable = true)
 |-- BELEPES: string (nullable = true)
 |-- FIZETES: integer (nullable = true)
 |-- JUTALEK: integer (nullable = true)
 |-- OAZON: integer (nullable = true)



In [8]:
from pyspark.sql.types import *

dolgozoSchema = StructType([
   StructField("DKOD", IntegerType(),True), 
   StructField("DNEV", StringType(),True), 
   StructField("FOGLALKOZAS", StringType(),True), 
   StructField("FONOKE", IntegerType(),True), 
   StructField("BELEPES", StringType(),True), 
   StructField("FIZETES", IntegerType(),True), 
   StructField("JUTALEK", IntegerType(),True), 
   StructField("OAZON", IntegerType(),True) 
])

df = spark.read.format('csv')\
.schema(dolgozoSchema).option('header',True).load('dolgozo.csv')

In [9]:
df.show(5)

+----+------+-----------+------+---------+-------+-------+-----+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+------+-----------+------+---------+-------+-------+-----+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|
|7698| BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|
+----+------+-----------+------+---------+-------+-------+-----+
only showing top 5 rows



In [10]:
df.columns

['DKOD',
 'DNEV',
 'FOGLALKOZAS',
 'FONOKE',
 'BELEPES',
 'FIZETES',
 'JUTALEK',
 'OAZON']

In [19]:
#df['DNEV']

df.take(4)

[Row(DKOD=7839, DNEV='KING', FOGLALKOZAS='PRESIDENT', FONOKE=0, BELEPES='81-NOV-17', FIZETES=5000, JUTALEK=0, OAZON=10),
 Row(DKOD=7698, DNEV='BLAKE', FOGLALKOZAS='MANAGER', FONOKE=7839, BELEPES='81-MAY-01', FIZETES=2850, JUTALEK=0, OAZON=30),
 Row(DKOD=7782, DNEV='CLARK', FOGLALKOZAS='MANAGER', FONOKE=7839, BELEPES='81-JUN-09', FIZETES=2450, JUTALEK=0, OAZON=10),
 Row(DKOD=7566, DNEV='JONES', FOGLALKOZAS='MANAGER', FONOKE=7839, BELEPES='81-APR-02', FIZETES=2975, JUTALEK=0, OAZON=20)]

In [21]:
dSchema = StructType([
   StructField("DKOD", IntegerType(),True), 
   StructField("DNEV", StringType(),True), 
   StructField("FIZETES", IntegerType(),True)
])

rows = [
    Row(1111, 'Valaki', 1000),
    Row(1121, 'Valaki2', 2000),
    Row(1131, 'Valaki3', 3000)
]

dolgozoRDD = spark.sparkContext.parallelize(rows)
dolgozoDF = spark.createDataFrame(dolgozoRDD, dSchema)
dolgozoDF.show()


+----+-------+-------+
|DKOD|   DNEV|FIZETES|
+----+-------+-------+
|1111| Valaki|   1000|
|1121|Valaki2|   2000|
|1131|Valaki3|   3000|
+----+-------+-------+



In [23]:
df.select('DNEV', 'FIZETES').show(2)

+-----+-------+
| DNEV|FIZETES|
+-----+-------+
| KING|   5000|
|BLAKE|   2850|
+-----+-------+
only showing top 2 rows



In [25]:
df.selectExpr("*","(FIZETES+JUTALEK) as KERESET").show(5)

+----+------+-----------+------+---------+-------+-------+-----+-------+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|KERESET|
+----+------+-----------+------+---------+-------+-------+-----+-------+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|   5000|
|7698| BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|   2850|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|   2450|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|   2975|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|   2650|
+----+------+-----------+------+---------+-------+-------+-----+-------+
only showing top 5 rows



In [26]:
df.selectExpr("avg(FIZETES)","count(distinct(OAZON))").show(5)

+------------+---------------------+
|avg(FIZETES)|count(DISTINCT OAZON)|
+------------+---------------------+
|   1976.5625|                    3|
+------------+---------------------+



In [29]:
from pyspark.sql.functions import *

df.withColumn("KERESET", expr("FIZETES + JUTALEK")).show(2)
#df.withColumn("KERESET", lambda x : x['FIZETES']+x['JUTALEK'])

df.withColumnRenamed("FIZETES","HETI_FIZETES").show(3)

df.drop("FONOKE").show(2)

+----+-----+-----------+------+---------+-------+-------+-----+-------+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|KERESET|
+----+-----+-----------+------+---------+-------+-------+-----+-------+
|7839| KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|   5000|
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|   2850|
+----+-----+-----------+------+---------+-------+-------+-----+-------+
only showing top 2 rows

+----+-----+-----------+------+---------+------------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|HETI_FIZETES|JUTALEK|OAZON|
+----+-----+-----------+------+---------+------------+-------+-----+
|7839| KING|  PRESIDENT|     0|81-NOV-17|        5000|      0|   10|
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|        2850|      0|   30|
|7782|CLARK|    MANAGER|  7839|81-JUN-09|        2450|      0|   10|
+----+-----+-----------+------+---------+------------+-------+-----+
only showing top 3 rows

+----+-----+-----------+---------+-

In [34]:
df.filter(col('FONOKE') == 7839).show(2)

df.where(col('FONOKE') == 7839).where(col('FIZETES') > 2000).show(2)

df.where((col('FONOKE') == 7839) & (col('FIZETES') > 2000)).show(2)

+----+-----+-----------+------+---------+-------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+-----+-----------+------+---------+-------+-------+-----+
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|
|7782|CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
+----+-----+-----------+------+---------+-------+-------+-----+
only showing top 2 rows

+----+-----+-----------+------+---------+-------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+-----+-----------+------+---------+-------+-------+-----+
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|
|7782|CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
+----+-----+-----------+------+---------+-------+-------+-----+
only showing top 2 rows

+----+-----+-----------+------+---------+-------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+-----+-----------+------+---------+-------+-----

In [35]:
df.select(col("OAZON")).distinct().count()

3

In [36]:
df.orderBy(col('OAZON'), col("FIZETES").desc() ).show(3)

+----+------+-----------+------+---------+-------+-------+-----+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+------+-----------+------+---------+-------+-------+-----+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7934|MILLER|      CLERK|  7782|82-JAN-23|   1300|      0|   10|
+----+------+-----------+------+---------+-------+-------+-----+
only showing top 3 rows



In [42]:
df.groupBy("OAZON").avg("FIZETES").show(5)

+-----+------------------+
|OAZON|      avg(FIZETES)|
+-----+------------------+
|   20|            2112.5|
|   10|            2387.5|
|   30|1566.6666666666667|
+-----+------------------+



In [43]:
osztaly = spark.read.format('csv').option('header',True)\
.option("inferSchema",True).load("osztaly.csv")

osztaly.show(3)

+-----+----------+---------+
|OAZON|       NEV|TELEPHELY|
+-----+----------+---------+
|   10|ACCOUNTING| NEW YORK|
|   20|  RESEARCH|   DALLAS|
|   30|     SALES|  CHICAGO|
+-----+----------+---------+
only showing top 3 rows



In [44]:
df.alias("D")\
.join(osztaly.alias("O"), col("D.OAZON") == col("O.OAZON"), "inner" ).show(5)

+----+------+-----------+------+---------+-------+-------+-----+-----+----------+---------+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|OAZON|       NEV|TELEPHELY|
+----+------+-----------+------+---------+-------+-------+-----+-----+----------+---------+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|   10|ACCOUNTING| NEW YORK|
|7698| BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|   30|     SALES|  CHICAGO|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|   10|ACCOUNTING| NEW YORK|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|   20|  RESEARCH|   DALLAS|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|   30|     SALES|  CHICAGO|
+----+------+-----------+------+---------+-------+-------+-----+-----+----------+---------+
only showing top 5 rows



In [45]:
df.createOrReplaceTempView("dolgozoTemp")

In [47]:
spark.sql("SELECT * FROM dolgozoTemp").show(2)

spark.sql("SELECT OAZON, count(*) FROM dolgozoTemp GROUP BY OAZON").show(3)

+----+-----+-----------+------+---------+-------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+-----+-----------+------+---------+-------+-------+-----+
|7839| KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|
+----+-----+-----------+------+---------+-------+-------+-----+
only showing top 2 rows

+-----+--------+
|OAZON|count(1)|
+-----+--------+
|   20|       6|
|   10|       4|
|   30|       6|
+-----+--------+



In [51]:
spark.sql("""
SELECT OAZON, count(*) FROM dolgozoTemp GROUP BY OAZON
""").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[OAZON#157], functions=[count(1)])
   +- Exchange hashpartitioning(OAZON#157, 200), ENSURE_REQUIREMENTS, [id=#904]
      +- HashAggregate(keys=[OAZON#157], functions=[partial_count(1)])
         +- FileScan csv [OAZON#157] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/ggombos/dolgozo.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<OAZON:int>




In [49]:
df.groupBy("OAZON").count().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[OAZON#157], functions=[count(1)])
   +- Exchange hashpartitioning(OAZON#157, 200), ENSURE_REQUIREMENTS, [id=#860]
      +- HashAggregate(keys=[OAZON#157], functions=[partial_count(1)])
         +- FileScan csv [OAZON#157] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/ggombos/dolgozo.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<OAZON:int>




In [50]:
retail = spark.read.format('csv').option('header',True)\
.option("inferSchema",True).load("online_retail_data.csv").show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



- Melyik országban él a legtöbb vásárló?
- Melyik tranzakció során vásároltak a legtöbb különböző terméket és hányat?
- Átlagosan hány különböző terméket vesz egy vásárló egy vásárlás során?
- Melyik a legnépszerűbb termék?
- Melyik termék termelte a legnagyobb bevételt? (darab*unitprice)
- Adjuk meg azt az 5 országot, amelyből a legtöbb bevétel származik?