-
Notifications
You must be signed in to change notification settings - Fork 1
/
Assignment 5
363 lines (298 loc) · 14.8 KB
/
Assignment 5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
1. Use a public dataset from GCP (https://console.cloud.google.com/marketplace/browse?filter=solution-type:dataset&pli=1). Try using Newyork City Taxi data set.
------------------------------------------------------------------------------------------------------------------------------------------------------------------
Refer image Assignment5_1.png
===================================================================================================================================================================
2. Create a python script which will run as a job in the data pipeline. The python script will read data from the nosql db and clean the data - remove null values from all columns, remove duplicate entries. The cleaned data is then written into parquet or orc file (alternatively can write to a json file).
Solution
-------
Verify if BigQuery, Compute Engine, Dataproc, Composer and Google Cloud Storage APIs are enabled.
For Google cloud composer creation, refer Assignment5_2CloudComposer.png
In the Cloud Shell create the environment with the gcloud command. Note the env-variables takes a list of Variables that will be available to all DAGs in this environment.
gcloud composer environments create demo-ephemeral-dataproc1 \
--location us-central1 \
--zone us-central1-b \
--machine-type n1-standard-2 \
--disk-size 20
# Set Airflow Variables in the Composer Environment we just created.
gcloud composer environments run \
demo-ephemeral-dataproc1 \
--location=us-central1 variables -- \
--set gcp_project $PROJECT
gcloud composer environments run demo-ephemeral-dataproc1 \
--location=us-central1 variables -- \
--set gce_zone us-central1-b
gcloud composer environments run demo-ephemeral-dataproc1 \
--location=us-central1 variables -- \
--set gcs_bucket $PROJECT
gcloud composer environments run demo-ephemeral-dataproc1 \
--location=us-central1 variables -- \
--set bq_output_table $PROJECT:ComposerDemo.nyc_tlc_yellow_trips
gcloud composer environments run demo-ephemeral-dataproc1 \
--location=us-central1 variables -- \
--set dataproc_bucket $PROJECT-dataproc-staging
Check DAG folder for composer environment.
Refer Assignment5_2DAG.png
import sys
import csv
import datetime
from google.cloud import datastore
import pandas as pd
import os
client = datastore.Client()
query = client.query(kind='nyc')
query_iter = query.fetch()
df = pd.DataFrame(query_iter)
print("Rows in the Dataframe: {}.".format(df.count()))
#Drop null
df = df.dropna(how='any',axis=0)
print("# Dataframe rows after dropping Null: {}.".format(df.count()))
os.system('rm -f nyc.parquet;')
df.to_parquet("nyc.parquet")
os.system('gsutil cp nyc.parquet us-central1 gs://dataproc-nyctaxi/dphWeek5/;')
==================================================================================
Q3 and q4(Airflow).
import pyspark
import sys
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('NYC Taxi').getOrCreate()
nyctaxi_df = spark.read.parquet("gs://dataproc-nyctaxi/dphWeek5/nyc.parquet")
nyctaxi_df.describe().show()
nyctaxi_df.agg({'trip_distance': 'mean'}).show()
nyctaxi_df.groupby('passenger_count').count().orderBy(nyctaxi_df.passenger_count.desc()).show()
nyctaxi_df.groupby('passenger_count').agg({'trip_distance': 'mean'}).show()
nyctaxi_df.groupby('passenger_count').count(). \
orderBy(nyctaxi_df.passenger_count.desc()).show()
----------------------------------------------------------------------------------------------
get the latest python script for making IAP requests and install it’s requirements.
# Install necessary requirements for making iap requests with
# dag_trigger.py
(curl https://raw.githubusercontent.com/GoogleCloudPlatform/python-docs-samples/master/iap/requirements.txt; echo 'tzlocal>=1.5.1') >> ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/iap_requirements.txt
# Get latest version of make_iap_request from python-docs-samples.
curl https://raw.githubusercontent.com/GoogleCloudPlatform/python-docs-samples/master/iap/make_iap_request.py >> ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/make_iap_request.py
create a service account facilitate triggering your DAG by a POST to an endpoint.
gcloud iam service-accounts create dag-trigger
# Give service account permissions to create tokens for
# iap requests.
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/iam.serviceAccountActor
# Service account also needs to be authorized to use Composer.
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/composer.user
# We need a service account key to trigger the dag.
gcloud iam service-accounts keys create ~/$PROJECT-dag-trigger-key.json \
--iam-account=dag-trigger@$PROJECT.iam.gserviceaccount.com
export GOOGLE_APPLICATION_CREDENTIALS=~/$PROJECT-dag-trigger-key.json
Triggering DAG
cd professional-services/examples/cloud-composer-example
# Here you set up the python environment.
# Pip is a tool, similar to maven in the java world
pip install — upgrade virtualenv
pip install -U pip
virtualenv composer-env
source composer-env/bin/activate
# By default one of Airflow's dependencies installs a GPL dependency
#(unidecode). To avoid this dependency set
# SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you
# install or upgrade Airflow.
export SLUGIFY_USES_TEXT_UNIDECODE=yes
# Install requirements for this and other examples in
# cloud-composer-examples
pip install -r requirements.txt
# Required for dag_trigger.py
pip install -r iap_requirements.txt
# (Optional for testing spark code locally).
# pip install pyspark>=2.3.1
Find the url of your Airflow API for this DAG and the client id for airflow.
gcloud iam service-accounts create dag-trigger
# Give service account permissions to create tokens for
# iap requests.
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/iam.serviceAccountActor
# Service account also needs to be authorized to use Composer.
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/composer.user
# We need a service account key to trigger the dag.
gcloud iam service-accounts keys create ~/$PROJECT-dag-trigger-key.json \
--iam-account=dag-trigger@$PROJECT.iam.gserviceaccount.com
# Finally use this as your application credentials by setting the environment variable on the machine you will run `dag_trigger.py`
export GOOGLE_APPLICATION_CREDENTIALS=~/$PROJECT-dag-trigger-key.json
DAG_trigger.py
import argparse
from datetime import datetime
import json
from tzlocal import get_localzone
import make_iap_request as iap
def main():
"""This main function calls the make_iap_request function which is defined
at
https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
and then prints the output of the function. The make_iap_request function
demonstrates how to authenticate to Identity-Aware Proxy using a service
account.
Returns:
A string containing the page body, or raises an exception if the page couldn't be retrieved.
"""
_LOCAL_TZ = get_localzone()
parser = argparse.ArgumentParser()
parser.add_argument(
"--url",
dest='url',
required=True,
help="The url of a resource sitting behind identity-aware proxy.")
parser.add_argument("--iapClientId",
dest='iapClientId',
required=True,
help="The Client ID of the IAP OAuth Client.")
parser.add_argument("--raw_path",
dest='raw_path',
required=True,
help="GCS path to raw files.")
args = parser.parse_args()
# Force trailing slash because logic in avearge-speed DAG expects it this way.
raw_path = args.raw_path if args.raw_path.endswith(
'/') else args.raw_path + '/'
bucket = raw_path.lstrip('gs://').split('/')[0]
# This transformed path is relative to the bucket Variable in the Airflow environment.
# Note, the gs://<bucket> prefix is stripped because the GoogleCloudStorageToBigQueryOperator
# expects the source_objects as relative to the bucket param
transformed_path = raw_path.replace('/raw-', '/transformed-').replace(
'gs://' + bucket + '/', '')
failed_path = raw_path.replace('/raw-',
'/failed-').replace('gs://' + bucket + '/',
'')
# Note, we need to remove the trailing slash because of how the the spark saveAsTextFile
# method works.
transformed_path = transformed_path.rstrip('/')
# Place parameters to be passed as part of the dag_run triggered by this POST here.
# In this example we will pass the path where the raw files are and the path where we should
# place the transformed files.
conf = {
'raw_path': raw_path,
'transformed_path': transformed_path,
'failed_path': failed_path,
}
# The api signature requires a unique run_id
payload = {
'run_id':
'post-triggered-run-%s' %
datetime.now(_LOCAL_TZ).strftime('%Y%m%d%H%M%s%Z'),
'conf':
json.dumps(conf),
}
return iap.make_iap_request(args.url,
args.iapClientId,
method='POST',
data=json.dumps(payload))
if __name__ == "__main__":
main()
-----------------------------------------------------------------------------------------
Jobs for Airflow
---------------
DEFAULT_DAG_ARGS = {
'owner': 'airflow', # The owner of the task.
# Task instance should not rely on the previous task's schedule to succeed.
'depends_on_past': False,
# We use this in combination with schedule_interval=None to only trigger the DAG with a
# POST to the REST API.
# Alternatively, we could set this to yesterday and the dag will be triggered upon upload to the
# dag folder.
'start_date': datetime(2020, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, # Retry once before failing the task.
'retry_delay': timedelta(minutes=5), # Time between retries.
'project_id': Variable.get('gcp_project'), # Cloud Composer project ID.
# We only want the DAG to run when we POST to the api.
# Alternatively, this could be set to '@daily' to run the job once a day.
# more options at https://airflow.apache.org/scheduler.html#dag-runs
}
# Create Directed Acyclic Graph for Airflow
with DAG('average-speed',
default_args=DEFAULT_DAG_ARGS,
schedule_interval=None) as dag: # Here we are using dag as context.
# Create the Cloud Dataproc cluster.
# Note: this operator will be flagged a success if the cluster by this name already exists.
create_cluster = DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# ds_nodash is an airflow macro for "[Execution] Date string no dashes"
# in YYYYMMDD format. See docs https://airflow.apache.org/code.html?highlight=macros#macros
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
image_version='1.5-debian10',
num_workers=2,
storage_bucket=Variable.get('dataproc_bucket'),
zone=Variable.get('gce_zone'))
# Submit the PySpark job.
submit_pyspark = DataProcPySparkOperator(
task_id='run_dataproc_pyspark',
main='gs://' + Variable.get('gcs_bucket') +
'/spark-jobs/spark_avg_speed.py',
# Obviously needs to match the name of cluster created in the prior Operator.
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
# Let's template our arguments for the pyspark job from the POST payload.
arguments=[
"--gcs_path_raw={{ dag_run.conf['raw_path'] }}",
"--gcs_path_transformed=gs://{{ var.value.gcs_bucket}}" +
"/{{ dag_run.conf['transformed_path'] }}"
])
# Load the transformed files to a BigQuery table.
bq_load = GoogleCloudStorageToBigQueryOperator(
task_id='GCS_to_BigQuery',
bucket='{{ var.value.gcs_bucket }}',
# Wildcard for objects created by spark job to be written to BigQuery
# Reads the relative path to the objects transformed by the spark job from the POST message.
source_objects=["{{ dag_run.conf['transformed_path'] }}/part-*"],
destination_project_dataset_table='{{ var.value.bq_output_table }}',
schema_fields=None,
schema_object=
'schemas/nyc-tlc-yellow.json', # Relative gcs path to schema file.
source_format=
'CSV', # Note that our spark job does json -> csv conversion.
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
write_disposition='WRITE_TRUNCATE', # If the table exists, overwrite it.
max_bad_records=0)
# Delete the Cloud Dataproc cluster.
delete_cluster = DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
# Obviously needs to match the name of cluster created in the prior two Operators.
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
# This will tear down the cluster even if there are failures in upstream tasks.
trigger_rule=TriggerRule.ALL_DONE)
# Delete gcs files in the timestamped transformed folder.
delete_transformed_files = BashOperator(
task_id='delete_transformed_files',
bash_command="gsutil -m rm -r gs://{{ var.value.gcs_bucket }}" +
"/{{ dag_run.conf['transformed_path'] }}/")
# If the spark job or BQ Load fails we rename the timestamped raw path to
# a timestamped failed path.
move_failed_files = BashOperator(
task_id='move_failed_files',
bash_command="gsutil mv gs://{{ var.value.gcs_bucket }}" +
"/{{ dag_run.conf['raw_path'] }}/ " +
"gs://{{ var.value.gcs_bucket}}" +
"/{{ dag_run.conf['failed_path'] }}/",
trigger_rule=TriggerRule.ONE_FAILED)
# Set the dag property of the first Operators, this will be inherited by downstream Operators.
create_cluster.dag = dag
create_cluster.set_downstream(submit_pyspark)
submit_pyspark.set_downstream([delete_cluster, bq_load])
bq_load.set_downstream(delete_transformed_files)
move_failed_files.set_upstream([bq_load, submit_pyspark])
-------------------------------------------
Refer Assignment5_Airflow.png