Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DE임태규 - W5M2 #285

Open
wants to merge 10 commits into
base: DE임태규_W5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
####### 미션에서 큰 파일
w2/sentiment_analysis/tweets.csv
w2/sentiment_analysis/webtoon_analysis/webtoon_comments.db

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
33 changes: 0 additions & 33 deletions missions/W1/mtcars.csv

This file was deleted.

Binary file removed slides/W1 Introduction to Data Engineering.pdf
Binary file not shown.
Binary file removed slides/W2 Introduction to Big Data.pdf
Binary file not shown.
157 changes: 157 additions & 0 deletions w5/m2/W5M2.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# W5M2 - Optimization"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 라이브러리 및 세션 설정"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark import SparkConf\n",
"from pyspark.sql import SparkSession, Row\n",
"from pyspark.sql.functions import isnull, avg, min, date_format\n",
"from operator import add\n",
"\n",
"spark = SparkSession.builder \\\n",
" .master('spark://spark-master:7077') \\\n",
" .appName('W5M2') \\\n",
" .config('spark.executor.memory', '4gb') \\\n",
" .config(\"spark.executor.cores\", \"5\") \\\n",
" .getOrCreate()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 데이터 로딩"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"TLC_data_path = 'hdfs://spark-master:9000/user/hduser/hdfs_data/fhvhv_tripdata_2023-01.parquet'\n",
"weather_data_path = 'hdfs://spark-master:9000/user/hduser/hdfs_data/weather.csv'\n",
"output_dir_path = 'hdfs://spark-master:9000/user/spark_user/W5M2_output/'\n",
"tlc_ext = 'parquet'\n",
"weather_ext = 'csv'\n",
"\n",
"def load_dataframe(spark_session, file_path, extension):\n",
" if extension == \"csv\":\n",
" df = spark_session.read.csv(file_path, header=True, inferSchema=True)\n",
" elif extension == \"parquet\":\n",
" df = spark_session.read.parquet(file_path)\n",
" else:\n",
" raise NotImplementedError(\"Unsupported file extension.\")\n",
" return df\n",
"\n",
"df = load_dataframe(spark, TLC_data_path, tlc_ext)\n",
"print(\"- The schema of the TLC DataFrame - \\n\", df.schema)\n",
"df.show(1, vertical=True)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 데이터 클리닝"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Remove invalid or null entries and filter out unrealistic values\n",
"df = df.na.drop('any').filter(df.driver_pay > 0).filter(df.base_passenger_fare > 0)\n",
"df.show(5) # Check the top 5 rows after cleaning"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 데이터 변환"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Apply various transformations\n",
"df = df.withColumn(\"pickup_date\", date_format(df.pickup_datetime, 'yyyy-MM-dd'))\n",
"df = df.select(\"pickup_date\", \"base_passenger_fare\", \"trip_miles\")\n",
"df.cache()\n",
"\n",
"short_trip_df = df.filter(df.trip_miles < 10)\n",
"per_day_total_revenue_df = df.groupBy(\"pickup_date\").sum(\"base_passenger_fare\").orderBy(\"pickup_date\")\n",
"per_day_avg_trip_miles_df = df.groupBy(\"pickup_date\").mean(\"trip_miles\").orderBy(\"pickup_date\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 데이터 액션 및 저장"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Execute actions to trigger the transformations\n",
"print(\"Sample Short Trip Data: \", short_trip_df.take(1))\n",
"print(\"Sample Per Day Total Revenue: \", per_day_total_revenue_df.take(1))\n",
"print(\"Sample Per Day Average Trip Miles: \", per_day_avg_trip_miles_df.take(1))\n",
"\n",
"# Save the results to specified storage format\n",
"df.coalesce(1).write.mode('overwrite').csv(output_dir_path + \"df\")\n",
"short_trip_df.coalesce(1).write.mode('overwrite').csv(output_dir_path + \"short_trip_df\")\n",
"per_day_total_revenue_df.coalesce(1).write.mode('overwrite').csv(output_dir_path + \"per_day_total_revenue_df\")\n",
"per_day_avg_trip_miles_df.coalesce(1).write.mode('overwrite').csv(output_dir_path + \"per_day_avg_trip_miles_df\")\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
78 changes: 78 additions & 0 deletions w5/m2/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
services:
spark-master:
container_name: spark-master
hostname: spark-master
build: .
image: spark-standalone-cluster
entrypoint: ['./entrypoint.sh', 'master']
volumes:
- spark-logs:/home/spark_user/spark/spark-events
- W4M2:/home/spark_user/code
- namenode:/home/hduser/data
ports:
- '8080:8080'
- '7077:7077'
- '8888:8888'
- "9870:9870"
- "8088:8088"
networks:
- spark

spark-history-server:
container_name: spark-history
hostname: spark-history-server
build: .
entrypoint: ['./entrypoint.sh', 'history']
depends_on:
- spark-master
volumes:
- spark-logs:/home/spark_user/spark/spark-events
- datanode0:/home/hduser/data
ports:
- '18080:18080'
networks:
- spark

spark-worker1:
container_name: spark-worker1
hostname: spark-worker1
build: .
entrypoint: ['./entrypoint.sh', 'worker']
depends_on:
- spark-master
volumes:
- spark-logs:/home/spark_user/spark/spark-events
- datanode1:/home/hduser/data

ports:
- '11111:8081'
networks:
- spark

spark-worker2:
container_name: spark-worker2
hostname: spark-worker2
build: .
entrypoint: ['./entrypoint.sh', 'worker']
depends_on:
- spark-master
volumes:
- spark-logs:/home/spark_user/spark/spark-events
- datanode2:/home/hduser/data

ports:
- '22222:8081'
networks:
- spark

volumes:
spark-logs:
W4M2:
namenode:
datanode0:
datanode1:
datanode2:

networks:
spark:
driver: bridge