Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/airflow #16

Merged
merged 2 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions fundamental_data/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM apache/airflow:2.1.2-python3.9
COPY fundamental_data/requirements.txt .
COPY src/domain/utils/fetch_data.py ./src/domain/utils/
COPY src/domain/utils/new_earnings.py ./src/domain/utils/
RUN pip install -r requirements.txt
65 changes: 65 additions & 0 deletions fundamental_data/dags/fetch_fundamental_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
import sys, os
import pandas as pd
import numpy as np
from selenium import webdriver
from selenium.webdriver.chrome.service import Service # new line caused by DeprecationWarning: executable_path has been deprecated, please pass in a Service object
from selenium.webdriver.chrome.options import Options # new line for DevToolsActivePort issue
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from src.domain.utils.fetch_data import fetch_fundamental#, fetch_stock


fetch_data_dag = DAG(
dag_id="fetch_stock_data_dag",
doc_md="""# Fetching fundamental data DAG
This `DAG` :

* extract and transform stocks data from Zacks website

This DAG has been made by the trading bot datascientest team

""",
schedule_interval="0 6 * * 1-5",
tags=['tradingbot'],
default_args={
'start_date': datetime(2023,7,21)
}
)

tickers = ["AAPL", "TSLA"]

def fetch_all_fundamental():
X_fundamental = pd.DataFrame(columns=["date", "stock"]).set_index("date")
for ticker in tickers:
# Get fundamental data
fundamental_data = fetch_fundamental(ticker)
# Add stock identifier to merge with market data
fundamental_data["stock"] = ticker
# Append
X_fundamental = pd.concat([X_fundamental, fundamental_data])
# Get US bond data (more than 31 days to make sure to be able to compute MoM)
us_bond = US_bond_yfinance.get_bonds(historical_days = 35)
X_fundamental = X_fundamental.join(us_bond, how = 'left')
# Get VIX data
vix_df = VIX.get_vix(historical_days=35)
X_fundamental = X_fundamental.join(vix_df, how = 'left')
# Get fundamental features
X_fundamental = fundamental_features_engineering(X_fundamental)
# not here
# # Sector encoding with original label encoder
# X_fundamental.loc[:, "sector"] = sector_encoder.transform(X_fundamental["sector"])
now = datetime.now().strftime("%Y%m%d %H:%M")
X_fundamental.to_csv("./log"+now+".csv")


task_1 = PythonOperator(
task_id='fetch_all_fundamental',
dag=fetch_data_dag,
python_callable=fetch_all_fundamental
)
163 changes: 163 additions & 0 deletions fundamental_data/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:master-python3.8
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# AIRFLOW_GID - Group ID in Airflow containers
# Default: 50000
#
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: "3"
x-airflow-common: &airflow-common
build:
context: ../
dockerfile: fundamental_data/Dockerfile
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "true"
AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
AIRFLOW__SMTP__SMTP_HOST: "smtp.gmail.com"
AIRFLOW__SMTP__SMTP_PORT: 587
AIRFLOW__SMTP__SMTP_USER: "[email protected]"
AIRFLOW__SMTP__SMTP_PASSWORD: "cfsrvkongsobheta"
AIRFLOW__SMTP__SMTP_MAIL_FROM: "[email protected]"

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- pymongo}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy

services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always

redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always

airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always

airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test:
[
"CMD-SHELL",
'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"',
]
interval: 10s
timeout: 10s
retries: 5
restart: always

airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always

airflow-init:
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: "true"
_AIRFLOW_WWW_USER_CREATE: "true"
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always

volumes:
postgres-db-volume:
95 changes: 95 additions & 0 deletions fundamental_data/fetch_fundamental_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
import sys, os
import pandas as pd
import numpy as np
from selenium import webdriver
from selenium.webdriver.chrome.service import Service # new line caused by DeprecationWarning: executable_path has been deprecated, please pass in a Service object
from selenium.webdriver.chrome.options import Options # new line for DevToolsActivePort issue
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from fetch_data import fetch_fundamental


fetch_data_dag = DAG(
dag_id="fetch_stock_data_dag",
doc_md="""# Fetching fundamental data DAG
This `DAG` :

* extract and transform stocks data from Zacks website

This DAG has been made by the trading bot datascientest team

""",
schedule_interval="0 6 * * 1-5",
tags=['tradingbot'],
default_args={
'start_date': datetime(2023,7,19)
}
)

tickers = ["AAPL", "TSLA"]

def fetch_fundamental(ticker, historical_days=35):
# Get close data to compute peRatio and DividendsYield
fundamental_data = fetch_stock(ticker, historical_days=historical_days)["close"]
# Get stock dividents & earnings
stock_earnings, stock_dividends = new_earnings.get_earn_and_dividends(ticker, inference=True)
# Combining data with earnings & dividends info
fundamental_data = pd.concat([fundamental_data, stock_earnings], axis=1, join="inner")
fundamental_data = fundamental_data.join(stock_dividends, how = 'left')
# Get stock sector and industry from yahoo finance
stock_metadata = yf.Ticker(ticker).info
# They are not always there
try:
fundamental_data['sector'] = stock_metadata['sector']
except:
fundamental_data['sector'] = "Industrials" #to check
try:
fundamental_data['industry'] = stock_metadata['industry']
except:
fundamental_data['industry'] = float("nan")
# # Get VIX Volatility index and join
# vix_df = VIX.get_vix(historical_days=historical_days)
# fundamental_data = fundamental_data.join(vix_df, how = 'left')
# # Get 10Y_bond index and combine data with US Bonds
# us_bond = US_bond_yfinance.get_bonds(historical_days = 35)
# fundamental_data = fundamental_data.join(us_bond, how = 'left')
# Sorting values by date
fundamental_data.sort_values(by = 'date', axis = 0, ascending = True, inplace = True)

return fundamental_data

def fetch_all_fundamental():
X_fundamental = pd.DataFrame(columns=["date", "stock"]).set_index("date")
for ticker in tickers:
# Get fundamental data
fundamental_data = fetch_fundamental(ticker)
# Add stock identifier to merge with market data
fundamental_data["stock"] = ticker
# Append
X_fundamental = pd.concat([X_fundamental, fundamental_data])
# Get US bond data (more than 31 days to make sure to be able to compute MoM)
us_bond = US_bond_yfinance.get_bonds(historical_days = 35)
X_fundamental = X_fundamental.join(us_bond, how = 'left')
# Get VIX data
vix_df = VIX.get_vix(historical_days=35)
X_fundamental = X_fundamental.join(vix_df, how = 'left')
# Get fundamental features
X_fundamental = fundamental_features_engineering(X_fundamental)
# not here
# # Sector encoding with original label encoder
# X_fundamental.loc[:, "sector"] = sector_encoder.transform(X_fundamental["sector"])
now = datetime.now().strftime("%Y%m%d %H:%M")
X_fundamental.to_csv("./log"+now+".csv")


task_1 = PythonOperator(
task_id='fetch_all_fundamental',
dag=fetch_data_dag,
python_callable=fetch_all_fundamental
)
1 change: 1 addition & 0 deletions fundamental_data/logs/scheduler/latest
3 changes: 3 additions & 0 deletions fundamental_data/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
selenium
webdriver-manager
yfinance==0.2.23
Loading