Skip to content

Commit

Permalink
Merge pull request #9252 from davidcavazos/element-wise-with-timestamps
Browse files Browse the repository at this point in the history
[BEAM-7389] Add helper conversion samples and simplified tests
  • Loading branch information
aaltay authored Aug 5, 2019
2 parents 0d911b8 + ec8b65a commit 913f065
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,25 @@ def process(self, plant, timestamp=beam.DoFn.TimestampParam):
# [END processing_time]
if test:
test(plant_processing_times)


def time_tuple2unix_time():
# [START time_tuple2unix_time]
import time

time_tuple = time.strptime('2020-03-19 20:50:00', '%Y-%m-%d %H:%M:%S')
unix_time = time.mktime(time_tuple)
# [END time_tuple2unix_time]
return unix_time


def datetime2unix_time():
# [START datetime2unix_time]
import time
import datetime

now = datetime.datetime.now()
time_tuple = now.timetuple()
unix_time = time.mktime(time_tuple)
# [END datetime2unix_time]
return unix_time
Original file line number Diff line number Diff line change
Expand Up @@ -23,72 +23,80 @@

import mock

# pylint: disable=line-too-long
from apache_beam.examples.snippets.transforms.element_wise.with_timestamps import *
# pylint: enable=line-too-long
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

from . import with_timestamps


def check_plant_timestamps(actual):
# [START plant_timestamps]
plant_timestamps = [
'2020-04-01 00:00:00 - Strawberry',
'2020-06-01 00:00:00 - Carrot',
'2020-03-01 00:00:00 - Artichoke',
'2020-05-01 00:00:00 - Tomato',
'2020-09-01 00:00:00 - Potato',
]
# [END plant_timestamps]
assert_that(actual, equal_to(plant_timestamps))


def check_plant_events(actual):
# [START plant_events]
plant_events = [
'1 - Strawberry',
'4 - Carrot',
'2 - Artichoke',
'3 - Tomato',
'5 - Potato',
]
# [END plant_events]
assert_that(actual, equal_to(plant_events))


def check_plant_processing_times(actual):
import apache_beam as beam

# [START plant_processing_times]
plant_processing_times = [
'2020-03-20 20:12:42.145594 - Strawberry',
'2020-03-20 20:12:42.145827 - Carrot',
'2020-03-20 20:12:42.145962 - Artichoke',
'2020-03-20 20:12:42.146093 - Tomato',
'2020-03-20 20:12:42.146216 - Potato',
]
# [END plant_processing_times]

# Since `time.time()` will always give something different, we'll
# simply strip the timestamp information before testing the results.
actual = actual | beam.Map(lambda row: row.split('-')[-1].strip())
expected = [row.split('-')[-1].strip() for row in plant_processing_times]
assert_that(actual, equal_to(expected))


@mock.patch('apache_beam.Pipeline', TestPipeline)
# pylint: disable=line-too-long
@mock.patch('apache_beam.examples.snippets.transforms.element_wise.with_timestamps.print', lambda elem: elem)
# pylint: enable=line-too-long
class WithTimestampsTest(unittest.TestCase):
def __init__(self, methodName):
super(WithTimestampsTest, self).__init__(methodName)
# [START plant_seasons]
plant_seasons = [
'2020-04-01 00:00:00 - Strawberry',
'2020-06-01 00:00:00 - Carrot',
'2020-03-01 00:00:00 - Artichoke',
'2020-05-01 00:00:00 - Tomato',
'2020-09-01 00:00:00 - Potato',
]
# [END plant_seasons]
self.plant_seasons_test = lambda actual: \
assert_that(actual, equal_to(plant_seasons))

# [START plant_events]
plant_events = [
'1 - Strawberry',
'4 - Carrot',
'2 - Artichoke',
'3 - Tomato',
'5 - Potato',
]
# [END plant_events]
self.plant_events_test = lambda actual: \
assert_that(actual, equal_to(plant_events))

# [START plant_processing_times]
plant_processing_times = [
'2020-03-20 20:12:42.145594 - Strawberry',
'2020-03-20 20:12:42.145827 - Carrot',
'2020-03-20 20:12:42.145962 - Artichoke',
'2020-03-20 20:12:42.146093 - Tomato',
'2020-03-20 20:12:42.146216 - Potato',
]
# [END plant_processing_times]

def plant_processing_times_test(actual):
# Since `time.time()` will always give something different, we'll
# simply strip the timestamp information before testing the results.
import apache_beam as beam
actual = actual | beam.Map(lambda row: row.split('-')[-1].strip())
expected = [row.split('-')[-1].strip() for row in plant_processing_times]
assert_that(actual, equal_to(expected))
self.plant_processing_times_test = plant_processing_times_test

def test_event_time(self):
event_time(self.plant_seasons_test)
with_timestamps.event_time(check_plant_timestamps)

def test_logical_clock(self):
logical_clock(self.plant_events_test)
with_timestamps.logical_clock(check_plant_events)

def test_processing_time(self):
processing_time(self.plant_processing_times_test)
with_timestamps.processing_time(check_plant_processing_times)

def test_time_tuple2unix_time(self):
unix_time = with_timestamps.time_tuple2unix_time()
self.assertIsInstance(unix_time, float)

def test_datetime2unix_time(self):
unix_time = with_timestamps.datetime2unix_time()
self.assertIsInstance(unix_time, float)


if __name__ == '__main__':
Expand Down

0 comments on commit 913f065

Please sign in to comment.