Skip to content

Commit

Permalink
Merge pull request #75 from dataforgoodfr/feat/add-new-alert
Browse files Browse the repository at this point in the history
Feat/add new alert
  • Loading branch information
RonanMorgan authored Nov 24, 2023
2 parents b60b619 + 665464c commit 04d45f7
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 30 deletions.
16 changes: 5 additions & 11 deletions alembic/versions/1fd83d22bd1e_create_alert_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

import sqlalchemy as sa
from sqlalchemy import Inspector

from sqlalchemy.dialects.postgresql import ARRAY
from alembic import op

# revision identifiers, used by Alembic.
revision = "1fd83d22bd1e"
down_revision = "e52b9542531c"
down_revision = "68c9f220a07f"
branch_labels = None
depends_on = None

Expand All @@ -28,9 +28,10 @@ def upgrade() -> None:
primary_key=True,
index=True,
),
sa.Column("timestamp", sa.DateTime),
sa.Column("timestamp", sa.DateTime, index=True, nullable=False),
sa.Column("vessel_id", sa.Integer, index=True, nullable=False),
sa.Column("mpa_id", sa.Integer, index=True, nullable=False),
sa.Column("cross_mpa", sa.Integer, nullable=False),
sa.Column("mpa_ids", ARRAY(sa.BigInteger), nullable=False),
keep_existing=False,
)

Expand All @@ -42,13 +43,6 @@ def upgrade() -> None:
["id"],
)


# op.create_foreign_key(
# "fk_alert_mpa",
# "alert",
# "mpa",


def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

# revision identifiers, used by Alembic.
revision = "68c9f220a07f"
down_revision = "1fd83d22bd1e"
down_revision = "e52b9542531c"
branch_labels = None
depends_on = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def upgrade() -> None:
index=True,
default=uuid.uuid4,
),
sa.Column("timestamp", sa.DateTime),
sa.Column("timestamp", sa.DateTime, index=True),
sa.Column("ship_name", sa.String),
sa.Column("IMO", sa.String),
sa.Column("vessel_id", sa.Integer, index=True, nullable=False),
sa.Column("mmsi", sa.Integer, index=True),
sa.Column("mmsi", sa.Integer),
sa.Column("last_position_time", sa.DateTime),
sa.Column("fishing", sa.Boolean),
sa.Column("at_port", sa.Boolean),
Expand All @@ -56,11 +56,11 @@ def upgrade() -> None:
index=True,
default=uuid.uuid4,
),
sa.Column("timestamp", sa.DateTime),
sa.Column("timestamp", sa.DateTime, index=True),
sa.Column("ship_name", sa.String),
sa.Column("IMO", sa.String),
sa.Column("vessel_id", sa.Integer, index=True, nullable=False),
sa.Column("mmsi", sa.Integer, index=True),
sa.Column("mmsi", sa.Integer),
sa.Column("last_position_time", sa.DateTime),
sa.Column("fishing", sa.Boolean),
sa.Column("at_port", sa.Boolean),
Expand Down
6 changes: 3 additions & 3 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def main() -> None:
)
args = parser.parse_args()
use_cases = UseCases()
marine_traffic_usecase = use_cases.scrap_marine_data_usecase()
#marine_traffic_usecase = use_cases.scrap_marine_data_usecase()
spire_traffic_usecase = use_cases.get_spire_data_usecase()
alert_usecase = use_cases.generate_alert_usecase()
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
Expand All @@ -41,7 +41,7 @@ def main() -> None:
spire_traffic_usecase.save_vessels(
spire_traffic_usecase.get_all_vessels(timestamp),
)
marine_traffic_usecase.scrap_vessels(timestamp)
#marine_traffic_usecase.scrap_vessels(timestamp)
alert_usecase.generate_alerts(timestamp)
while True:
scheduler.start()
Expand All @@ -50,7 +50,7 @@ def main() -> None:
spire_traffic_usecase.save_vessels(
spire_traffic_usecase.get_all_vessels(timestamp),
)
marine_traffic_usecase.scrap_vessels(timestamp)
#marine_traffic_usecase.scrap_vessels(timestamp)
alert_usecase.generate_alerts(timestamp)


Expand Down
44 changes: 33 additions & 11 deletions bloom/infra/repositories/repository_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,40 @@ def save_alerts(self, timestamp: datetime) -> None:
with self.session_factory() as session:
sql = text(
f"""
INSERT INTO alert(timestamp,vessel_id,mpa_id)
(SELECT spire_vessel_positions.timestamp,
spire_vessel_positions.vessel_id,mpa.index
FROM spire_vessel_positions
JOIN mpa
ON ST_Contains(mpa.geometry, spire_vessel_positions.position)
WHERE spire_vessel_positions.timestamp = '{timestamp}');
INSERT INTO alert(timestamp,vessel_id,cross_mpa,mpa_ids)
(
SELECT timestamp, vessel_id, (CAST(ST_Contains(mpa_fr_with_mn.geometry,current_position) AS INT) - CAST(ST_Contains(mpa_fr_with_mn.geometry,previous_position) AS INT)) as cross_mpa, ARRAY_AGG(mpa_fr_with_mn.index ORDER BY mpa_fr_with_mn.index DESC) AS mpa_ids FROM
(SELECT spire_vessel_positions.vessel_id AS vessel_id,
spire_vessel_positions.position AS current_position,
spire_vessel_positions.timestamp AS timestamp,
LAG(spire_vessel_positions.position) OVER (PARTITION BY spire_vessel_positions.vessel_id ORDER BY spire_vessel_positions.timestamp) AS previous_position
FROM spire_vessel_positions WHERE spire_vessel_positions.timestamp >= TIMESTAMP '{timestamp}' - INTERVAL '15 minutes' AND spire_vessel_positions.timestamp < TIMESTAMP '{timestamp}' + INTERVAL '15 minutes' ) AS foo
CROSS JOIN mpa_fr_with_mn WHERE previous_position IS NOT NULL and ST_Contains(mpa_fr_with_mn.geometry,current_position) != ST_Contains(mpa_fr_with_mn.geometry,previous_position) GROUP BY vessel_id, timestamp,cross_mpa
);
""", # nosec: B608
)
session.execute(sql)
session.commit()
return

# an other query with the same result :
# WITH cte_query1 AS (
# SELECT spire_vessel_positions.vessel_id AS vessel_id, ARRAY_AGG(mpa_fr_with_mn.index ORDER BY mpa_fr_with_mn.index DESC) AS mpa_ids
# FROM spire_vessel_positions
# JOIN mpa_fr_with_mn ON ST_Contains(mpa_fr_with_mn.geometry, spire_vessel_positions.position)
# WHERE spire_vessel_positions.timestamp = TO_TIMESTAMP('2023-11-17 12:00', 'YYYY-MM-DD HH24:MI')
# GROUP BY vessel_id
# ),
# cte_query2 AS (
# SELECT DISTINCT spire_vessel_positions.vessel_id AS vessel_id, ARRAY_AGG(mpa_fr_with_mn.index ORDER BY mpa_fr_with_mn.index DESC) AS mpa_ids
# FROM spire_vessel_positions
# JOIN mpa_fr_with_mn ON ST_Contains(mpa_fr_with_mn.geometry, spire_vessel_positions.position)
# WHERE spire_vessel_positions.timestamp = TO_TIMESTAMP('2023-11-17 12:15', 'YYYY-MM-DD HH24:MI')
# GROUP BY vessel_id
# )
# SELECT vessel_id, mpa_ids, -1 AS value FROM cte_query1 EXCEPT SELECT vessel_id, mpa_ids, -1 AS value FROM cte_query2
# UNION ALL
# SELECT vessel_id, mpa_ids, 1 AS value FROM cte_query2 EXCEPT SELECT vessel_id, mpa_ids, 1 AS value FROM cte_query1

def load_alert(self, timestamp: datetime) -> list[Alert]:
with self.session_factory() as session:
Expand All @@ -40,17 +62,17 @@ def load_alert(self, timestamp: datetime) -> list[Alert]:
sql = text(
f"""
SELECT timestamp, ship_name, mmsi, lp_time, position,
mpa."NAME", mpa."IUCN_CAT"
FROM (SELECT a.mpa_id as mpa_id, a.timestamp as timestamp,
mpa_fr_with_mn.name, mpa_fr_with_mn."IUCN_CAT"
FROM (SELECT a.mpa_ids as mpa_ids, a.timestamp as timestamp,
spire.ship_name as ship_name,
spire.mmsi as mmsi, spire.last_position_time as lp_time,
ST_AsText(spire.position) as position
FROM alert a
JOIN (SELECT * FROM spire_vessel_positions
WHERE spire_vessel_positions.timestamp = '{timestamp}') as spire
ON a.vessel_id = spire.vessel_id
WHERE a.timestamp = '{timestamp}') as habile
JOIN mpa ON mpa_id = mpa.index
WHERE a.timestamp = '{timestamp}' and cross_mpa = 1) as habile
JOIN mpa_fr_with_mn ON mpa_ids[1] = mpa_fr_with_mn.index
""", # nosec: B608
)
e = session.execute(sql)
Expand Down

0 comments on commit 04d45f7

Please sign in to comment.