-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: republish failed events from csv (#390)
Co-authored-by: Tim McCormack <[email protected]>
- Loading branch information
Showing
7 changed files
with
231 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
""" | ||
Publish events from a CSV to the Event Bus. | ||
This is meant to help republish failed events. The CSV may be an export from Splunk, or it may be manually created, as | ||
long as it has 'initial_topic', 'event_type', 'event_data_as_json', 'event_key_field', and 'event_metadata_as_json' | ||
columns. | ||
If the CSV is manually created, you will need to use the python __repr__ of all fields. For strings, including | ||
json dumps, this means enclosing them in single quotes | ||
Example row (second line split up for readability, in the csv it should be all one line) | ||
initial_topic,event_type,event_data_as_json,event_key_field,event_metadata_as_json | ||
'test-topic','org.openedx.test.event','{"user_data": {"id": 1, "is_active": True}}','user_data.id', | ||
'{"event_type": "org.openedx.test.event", "id": "12345", "minorversion": 0, "source": "openedx/cms/web", | ||
"sourcehost": "ip-10-3-16-25", "time": "2023-08-10T17:15:38.549331+00:00", "sourcelib": [8, 5, 0]}' | ||
This is created as a script instead of a management command because it's meant to be used as a one-off and not to | ||
require pip installing this package into anything else to run. However, since edx-event-bus-kafka does expect certain | ||
settings, the script must be run in an environment with DJANGO_SETTINGS_MODULE. | ||
To run: | ||
tox -e scripts -- python edx_arch_experiments/scripts/republish_failed_events.py | ||
--filename /Users/rgraber/oneoffs/failed_events.csv | ||
""" | ||
|
||
import csv | ||
import json | ||
import sys | ||
from ast import literal_eval | ||
|
||
import click | ||
from edx_event_bus_kafka.internal.producer import create_producer | ||
from openedx_events.tooling import EventsMetadata, OpenEdxPublicSignal, load_all_signals | ||
|
||
|
||
@click.command() | ||
@click.option('--filename', type=click.Path(exists=True)) | ||
def read_and_send_events(filename): | ||
load_all_signals() | ||
producer = create_producer() | ||
try: | ||
log_columns = ['initial_topic', 'event_type', 'event_data_as_json', 'event_key_field', 'event_metadata_as_json'] | ||
with open(filename) as log_file: | ||
reader = csv.DictReader(log_file) | ||
# Make sure csv contains all necessary columns for republishing | ||
missing_columns = set(log_columns).difference(set(reader.fieldnames)) | ||
if len(missing_columns) > 0: | ||
print(f'Missing required columns {missing_columns}. Cannot republish events.') | ||
sys.exit(1) | ||
ids = set() | ||
for row in reader: | ||
# We log everything using __repr__, so strings get quotes around them and "None" gets | ||
# written literally. Use literal_eval to go from "None" to None and remove the extraneous quotes | ||
# from the logs | ||
empties = [key for key, value in row.items() if key in log_columns and literal_eval(value) is None] | ||
# If any row is missing data, stop processing the whole file to avoid sending events out of order | ||
if len(empties) > 0: | ||
print(f'Missing required fields in row {reader.line_num}: {empties}. Will not continue publishing.') | ||
sys.exit(1) | ||
|
||
topic = literal_eval(row['initial_topic']) | ||
event_type = literal_eval(row['event_type']) | ||
event_data = json.loads(literal_eval(row['event_data_as_json'])) | ||
event_key_field = literal_eval(row['event_key_field']) | ||
metadata = EventsMetadata.from_json(literal_eval(row['event_metadata_as_json'])) | ||
signal = OpenEdxPublicSignal.get_signal_by_type(event_type) | ||
if metadata.id in ids: | ||
print(f"Skipping duplicate id {metadata.id}") | ||
continue | ||
ids.add(metadata.id) | ||
|
||
producer.send(signal=signal, event_data=event_data, event_key_field=event_key_field, topic=topic, | ||
event_metadata=metadata) | ||
print(f'Successfully published event to event bus. line={reader.line_num} {event_data=} {topic=}' | ||
f' {event_key_field=} metadata={metadata.to_json()}') | ||
finally: | ||
producer.prepare_for_shutdown() | ||
|
||
|
||
if __name__ == '__main__': | ||
read_and_send_events() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
""" | ||
Settings for running scripts in /scripts | ||
""" | ||
import os | ||
from os.path import abspath, dirname, join | ||
|
||
if os.path.isfile(join(dirname(abspath(__file__)), 'private.py')): | ||
from .private import * # pylint: disable=import-error,wildcard-import |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# Requirements for running scripts | ||
|
||
-c constraints.txt | ||
|
||
-r base.txt # Core dependencies for this package | ||
edx-event-bus-kafka | ||
confluent-kafka[avro] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
# | ||
# This file is autogenerated by pip-compile with Python 3.8 | ||
# by the following command: | ||
# | ||
# make upgrade | ||
# | ||
asgiref==3.7.2 | ||
# via | ||
# -r requirements/base.txt | ||
# django | ||
attrs==23.1.0 | ||
# via openedx-events | ||
avro==1.11.2 | ||
# via confluent-kafka | ||
certifi==2023.7.22 | ||
# via requests | ||
cffi==1.15.1 | ||
# via | ||
# -r requirements/base.txt | ||
# pynacl | ||
charset-normalizer==3.2.0 | ||
# via requests | ||
click==8.1.5 | ||
# via | ||
# -r requirements/base.txt | ||
# code-annotations | ||
# edx-django-utils | ||
code-annotations==1.5.0 | ||
# via edx-toggles | ||
confluent-kafka[avro]==2.2.0 | ||
# via -r requirements/scripts.in | ||
django==3.2.20 | ||
# via | ||
# -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt | ||
# -r requirements/base.txt | ||
# django-crum | ||
# edx-django-utils | ||
# edx-event-bus-kafka | ||
# edx-toggles | ||
# openedx-events | ||
django-crum==0.7.9 | ||
# via | ||
# -r requirements/base.txt | ||
# edx-django-utils | ||
# edx-toggles | ||
django-waffle==3.0.0 | ||
# via | ||
# -r requirements/base.txt | ||
# edx-django-utils | ||
# edx-toggles | ||
edx-django-utils==5.5.0 | ||
# via | ||
# -r requirements/base.txt | ||
# edx-event-bus-kafka | ||
# edx-toggles | ||
edx-event-bus-kafka==5.3.1 | ||
# via -r requirements/scripts.in | ||
edx-opaque-keys[django]==2.4.0 | ||
# via openedx-events | ||
edx-toggles==5.1.0 | ||
# via edx-event-bus-kafka | ||
fastavro==1.8.2 | ||
# via | ||
# confluent-kafka | ||
# openedx-events | ||
idna==3.4 | ||
# via requests | ||
jinja2==3.1.2 | ||
# via code-annotations | ||
markupsafe==2.1.3 | ||
# via jinja2 | ||
newrelic==8.8.1 | ||
# via | ||
# -r requirements/base.txt | ||
# edx-django-utils | ||
openedx-events==8.5.0 | ||
# via edx-event-bus-kafka | ||
pbr==5.11.1 | ||
# via | ||
# -r requirements/base.txt | ||
# stevedore | ||
psutil==5.9.5 | ||
# via | ||
# -r requirements/base.txt | ||
# edx-django-utils | ||
pycparser==2.21 | ||
# via | ||
# -r requirements/base.txt | ||
# cffi | ||
pymongo==3.13.0 | ||
# via edx-opaque-keys | ||
pynacl==1.5.0 | ||
# via | ||
# -r requirements/base.txt | ||
# edx-django-utils | ||
python-slugify==8.0.1 | ||
# via code-annotations | ||
pytz==2023.3 | ||
# via | ||
# -r requirements/base.txt | ||
# django | ||
pyyaml==6.0.1 | ||
# via code-annotations | ||
requests==2.31.0 | ||
# via confluent-kafka | ||
sqlparse==0.4.4 | ||
# via | ||
# -r requirements/base.txt | ||
# django | ||
stevedore==5.1.0 | ||
# via | ||
# -r requirements/base.txt | ||
# code-annotations | ||
# edx-django-utils | ||
# edx-opaque-keys | ||
text-unidecode==1.3 | ||
# via python-slugify | ||
typing-extensions==4.7.1 | ||
# via | ||
# -r requirements/base.txt | ||
# asgiref | ||
urllib3==2.0.4 | ||
# via requests |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters