This chapter will cover the data-processing of duplicate and unique records. We define duplicate records as those with the same value in the same field across two or more records. Unique records are those which, for the value of a given field, no other records have the same value. Note that in each case we must describe which field(s) we mean when we say 'unique' or 'duplicate.' Pig is no different: by default the DISTINCT
command uses all fields, but we can trim fields from data relations to evaluate uniqueness in different ways, in terms of different fields.
We often find ourselves dealing with multiple records for a given concept or entity. At those times we may want to reduce our data to just one, unique instance of each key. We’ll introduce the operations UNION
, DISTINCT
and various DataFu UDFs (User Defined Functions) that achieve this operation.
We’ll also introduce set operations among relations using Pig, and set operations between data bags using DataFu UDFs.
It is often the case that you want to determine the unique set of values in a table or relation - that is, you want to remove duplicate values and retain only unique records. For instance, if you were creating a set of labels that describe items in an inventory - you would only want to see each label once in the final output, which you might use for a web page’s autocomplete form.
The DISTINCT
operator in Pig performs this operation.
Lets begin with a familiar example: the park_team_years table. It contains a row for every team for every season and every park in which the team played. Lets say we wanted to find what ballparks each team played in at least once. To find every distinct pair of team and home ballpark, we use Pig’s DISTINCT
operator:
many_team_park_pairs = FOREACH park_team_years GENERATE
team_id,
park_id;
team_park_pairs = DISTINCT many_team_park_pairs;
DUMP @;
...
(WS8,WAS05)
(WS8,WOR02)
(WS9,WAS06)
(WSN,WAS06)
(WSU,WAS03)
This is equivalent to the SQL statement SELECT DISTINCT player_id, team_id from batting;
. Don’t fall in the trap of using a GROUP
statement to find distinct values:
dont_do_this = FOREACH (GROUP park_team_years BY (team_id, park_id)) GENERATE
group.team_id,
group.park_id;
The DISTINCT
operation is able to use a combiner, eliminating duplicates at the mapper before shipping them to the reducer. This is a big win when there are frequent duplicates, especially if duplicates are likely to occur near each other. For example, duplicates in web logs (from refreshes, callbacks, etc.) will be sparse globally, but found often in the same log file.
The combiner may impose a minor penalty when there are very few or very sparse duplicates. In that case, you should still use DISTINCT
, but disable combiners with the pig.exec.nocombiner=true
setting.
We’ve seen how to eliminate duplicate records in relations - but what about within groups? For instance, what if we want to find what parks a team played in each year as a single record?
This can be done with the DISTINCT
operator inside a nested FOREACH
. Instead of finding every distinct team/home ballpark pair in a relation as we just did, let’s find the list of distinct home ballparks for each team, having performed a GROUP..BY
on team_id:
-- Eliminating Duplicate Records from a Group
team_park_list = FOREACH (GROUP park_team_years BY team_id) {
parks = DISTINCT park_team_years.park_id;
GENERATE
group AS team_id,
BagToString(parks, '|');
};
DUMP @;
...
(SLN,KAN03|STL03|STL05|STL07|STL09|STL10)
(SLU,STL04)
(SR1,SYR01)
(SR2,SYR02|SYR03|THR01)
(TBA,LBV01|STP01|TOK01)
You may be familiar with the equivalent SQL:
SELECT team_id, GROUP_CONCAT(DISTINCT park_id ORDER BY park_id) AS park_ids
FROM park_team_years
GROUP BY team_id
ORDER BY team_id, park_id DESC
;
The DataFu DistinctBy
UDF selects a single record for each key in a bag; the first record of each key it encounters.
It has the nice feature of being order-preserving: only the first record for a key is output, and all records that make it to the output follow the same relative ordering they had in the input bag,
What if we want to look at what teams a player played in, as well as where he began and ended his career? DistinctBy
gives us a clean way to retrieve the distinct teams a player served in, along with the first and last year of their tenure:
-- Find distinct tuples based on the 0th (first) key
DEFINE DistinctByYear datafu.pig.bags.DistinctBy('1');
bat_seasons = FOREACH bat_seasons GENERATE
player_id,
year_id,
team_id;
player_teams = FOREACH (GROUP bat_seasons BY player_id) {
sorted = ORDER bat_seasons.(team_id, year_id) BY year_id;
distinct_by_year = DistinctByYear(sorted);
GENERATE
group AS player_id,
BagToString(distinct_by_year, '|');
};
dump @;
...
(zupcibo01,BOS|1991|BOS|1992|BOS|1993|CHA|1994)
(zuvelpa01,ATL|1982|ATL|1983|ATL|1984|ATL|1985|NYA|1986|NYA|1987|CLE|1988|CLE|1989)
(zuverge01,DET|1954|BAL|1955|BAL|1956|BAL|1957|BAL|1958)
(zwilldu01,CHA|1910|CHF|1914|CHF|1915|CHN|1916)
The key is specified with a string argument in the DEFINE statement, naming the positional index(es) of the key’s fields as a comma-separated list.
The DISTINCT
operation is useful when you want to eliminate duplicates based on the whole record. But to instead find only rows having a unique
record for its key, or to find only rows having multiple records for its key, do a GROUP BY
and then filter on the size of the resulting bag using COUNT_STAR()
.
On a broadcast a couple years ago, announcer Tim McCarver paused from his regular delivery of the obvious and the officious to note that second baseman Asdrubal Cabrera "is the only player in the majors with that first name". This raises the question: how many other people in the history of baseball similarly are uniquely yclept [1]? Let’s create a table for the biography site awarding players the "Asdrubal" badge if they are the only one in possession of their first name.
people = FOREACH people GENERATE name_first, name_last, player_id, beg_date, end_date;
by_first_name = GROUP people BY name_first;
unique_first_names = FILTER by_first_name BY COUNT_STAR(people) == 1;
unique_players = FOREACH unique_first_names GENERATE
group AS name_first,
FLATTEN(people.(name_last, player_id, beg_date, end_date));
Which results in some interesting names:
... (Kristopher,Negron,negrokr01,2012-06-07,\N) (La Schelle,Tarver,tarvela01,1986-07-12,1986-10-05) (Mysterious,Walker,walkemy01,1910-06-28,1915-09-29) (Peek-A-Boo,Veach,veachpe01,1884-08-24,1890-07-25) (Phenomenal,Smith,smithph01,1884-04-18,1891-06-15)
Our approach should be getting familiar. We group on the key (name_first) and eliminate all rows possessing more than one record for the key. Since there is only one element in the bag, the FLATTEN
statement just acts to push the bag’s fields up into the record itself.
There are some amazing names in this list. You might be familiar with Honus Wagner, Eppa Rixey, Boog Powell or Yogi Berra, some of the more famous in the list. But have you heard recounted the diamond exploits of Firpo Mayberry, Zoilo Versalles, Pi Schwert or Bevo LeBourveau? Mul Holland, Sixto Lezcano, Welcome Gaston and Mox McQuery are names that really should come attached to a film noir detective; the villains could choose among Mysterious Walker, The Only Nolan, or Phenomenal Smith for their name. For a good night’s sleep on the couch, tell your spouse that your next child must be named for Urban Shocker, Twink Twining, Pussy Tebeau, Bris Lord, Boob Fowler, Crazy Schmit, Creepy Crespi, Cuddles Marshall, Vinegar Bend Mizell, or Buttercup Dickerson.
Set operations — intersection, union, set difference and so forth — are a valuable strategic formulation for the structural operations we’ve been learning. In terms of set operations, "Which users both clicked on ad for shirts and bought a shirt?" becomes "find the intersection of shirt-ad-clickers set with the shirt-buyers set". "What patients either were ill but did not test positive, or tested positive but were not ill?" becomes "find the symmetric difference of the actually-ill patients and the tested-positive patients". The relational logic that powers traditional database engines is, at its core, the algebra of sets. We’ve actually met many of the set operations in certain alternate guises, but set operations are so important it’s worth calling them out specifically.
When we say 'set', we mean an unordered collection of distinct elements. Those elements could be full records, or they could be key fields in a record — allowing us to intersect the shirt-ad-clickers and the shirt-buyers while carrying along information about the ad they clicked on and the shirt they bought.
In the next several sections, you’ll learn how to combine sets in the following ways:
-
'Distinct Union' (
A ∪ B
) — all distinct elements that are in 'A' or in 'B'. -
'Set Intersection' (
A ∩ B
) — all distinct elements that are in 'A' and also in 'B'. -
'Set Difference' (
A - B
) — all distinct elements that are in 'A' but are not in 'B'. -
'Symmetric Difference' (
a ^ b
) — all distinct elements that are in 'A' or in 'B' but not both. Put another way, it’s all distinct elements that are in 'A' but not 'B' as well as all distinct elements that are in 'B' but not 'A'. -
'Set Equality' (
A == B
) — every element in 'A' is also in 'B'. The result of the set equality operation is a boolean true or false, as opposed to a set as in the above operations.
The following table may help. The rows correspond to the kind of elements that are in both A and B; A but not B; and B but not A. Under the column for each operator, only the kinds of elements marked 'T' will be present in the result.
Union Inters Diff Diff Sym.Diff A B A∪B A∩B a-b b-a a^b A B T T T T - - - A - T - T - T - T - B - T T - - T T
The mechanics of working with sets depends on whether the set elements are represented as records in a bag or as rows in a full table. Set operations on bags are particularly straightforward thanks to the purpose-built UDFs in the Datafu package. Set operations on tables are done using a certain COGROUP
-and-FILTER
combination — wordier, but no more difficult. Let’s start with the patterns that implement set operations on full tables.
To demonstrate full-table set operations, we can relate the set of major US cities [2] with the set of US cities that have hosted a significant number (more than 50) of major-league games. To prove a point about set operations with duplicates, we will leave in the duplicates from the team cities (the Mets and Yankees both claim NY).
main_parks = FILTER parks BY n_games >= 50 AND country_id == 'US';
major_cities = FILTER geonames BY
(feature_class == 'P') AND
(feature_code matches 'PPL.*') AND
(country_code == 'US') AND
(population > 10000);
bball_city_names = FOREACH main_parks GENERATE city;
major_city_names = FOREACH major_cities GENERATE name;
If the only contents of the tables are the set membership keys, finding the distinct union is done how it sounds: apply union, then distinct.
major_or_baseball = DISTINCT (UNION bball_city_names, major_city_names);
For all the other set operations, or when the elements are keys within a record (rather than the full record), we will use some variation on a COGROUP to generate the result.
combined = COGROUP major_cities BY city, main_parks BY city;
major_or_parks = FOREACH combined GENERATE
group AS city,
FLATTEN(FirstTupleFromBag(major_cities.(state, pop_2011), ((chararray)NULL,(int)NULL))),
main_parks.park_id AS park_ids;
The DataFu FirstTupleFromBag
UDF is immensely simplifying. Since the city value is a unique key for the major_cities
table, we know that the major_cities
bag has only a single element. Applying FirstTupleFromBag
turns the bag-of-one-tuple into a tuple-of-two-fields, and applying FLATTEN
lifts the tuple-of-two-fields into top-level fields for state and for population. When the city
key has no match in the major_cities
table, the second argument to FirstTupleFromBag forces those fields to have NULL
values.
Our output looks like this:
... (Seaford,Seaford,15294,{}) (Seaside,Seaside,33025,{}) (Seattle,Seattle,608660,{(SEA03),(SEA01),(SEA02)}) ...
As we mentioned, there are potentially many park records for each city, and so the main_parks bag can have zero, one or many records. Above, we keep the list of parks around as a single field.
Having used COGROUP
on the two datasets, set intersections means that records lie in the set intersection when neither bag is empty.
combined = COGROUP major_cities BY name, main_parks BY city;
major_and_parks_f = FILTER combined BY
(COUNT_STAR(major_cities) > 0L) AND
(COUNT_STAR(main_parks) > 0L);
major_and_parks = FOREACH major_and_parks_f GENERATE
group AS city,
FLATTEN(FirstTupleFromBag(major_cities.(state, pop_2011), ((chararray)NULL,(int)NULL))),
main_parks.park_id AS park_ids;
Two notes. First, we test against COUNT_STAR(bag)
, and not SIZE(bag)
or IsEmpty(bag)
. Those latter two require actually materializing the bag — all the data is sent to the reducer, and no combiners can be used. Second, since COUNT_STAR returns a value of type long, it’s best to do the comparison against 0L
(a long) and not 0
(an int).
Having used COGROUP
on the two datasets, set difference means that records lie in A minus B when the second bag is empty, and they lie in B minus A when the first bag is empty.
combined = COGROUP major_cities BY name, main_parks BY city;
major_minus_parks_f = FILTER combined BY (COUNT_STAR(main_parks) == 0L);
major_minus_parks = FOREACH major_minus_parks_f GENERATE
group AS city,
FLATTEN(FirstTupleFromBag(major_cities.(name, population), ((chararray)NULL,(int)NULL))),
main_parks.park_id AS park_ids;
parks_minus_major_f = FILTER combined BY (COUNT_STAR(major_cities) == 0L);
parks_minus_major = FOREACH parks_minus_major_f GENERATE
group AS city,
FLATTEN(FirstTupleFromBag(major_cities.(name, population), ((chararray)NULL,(int)NULL))),
main_parks.park_id AS park_ids;
difference = UNION major_minus_parks, parks_minus_major;
Having used COGROUP
on the two datasets, records lie in the symmetric difference when one or the other bag is empty. (We don’t have to test for them both being empty — there wouldn’t be a row if that were the case.)
combined = COGROUP major_cities BY name, main_parks BY city;
major_xor_parks_f = FILTER combined BY
(COUNT_STAR(major_cities) == 0L) OR (COUNT_STAR(main_parks) == 0L);
major_xor_parks = FOREACH major_xor_parks_f GENERATE
group AS city,
FLATTEN(FirstTupleFromBag(major_cities.(name, population), ((chararray)NULL,(int)NULL))),
main_parks.park_id AS park_ids;
Set Equality indicates whether the elements of each set are identical — here, would tell us whether the set of keys in the major_cities table and the set of keys in the main_parks table were identical.
There are several ways to determine full-table set equality, but likely the most efficient is to see whether the two sets' symmetric difference is empty. An empty symmetric difference implies that every element of 'A' is in 'B', and that every element of 'B' is in 'A' — which is exactly what it means for two sets to be equal.
Properly testing whether a table is empty so is a bit more fiddly than you’d think. To illustrate the problem, first whip up a set that should compare as equal to the major_cities
table, run the symmetric difference stanza from above, and then test whether the table is empty:
major_cities_also = FOREACH major_cities GENERATE name;
major_xor_major = FILTER
(COGROUP major_cities BY name, major_cities_also BY name)
BY ((COUNT_STAR(major_cities) == 0L) OR (COUNT_STAR(major_cities_also) == 0L));
-- Does not work
major_equals_major_fail = FOREACH (GROUP major_xor_major ALL) GENERATE
(COUNT_STAR(major_xor_major) == 0L ? 1 : 0) AS is_equal;
The last statement of the code block attempts to measure whether the count of records in major_xor_major
is zero. And if the two tables were unequal, this would have worked. But major_xor_major
is empty and so the FOREACH has no lines to operate on. The output file is not a
1
as you’d expect, it’s an empty file.
Our integer table to the rescue! Actually we’ll use her baby brother 'one_line.tsv': it has one record, with fields uno (value 1
) and zilch (value 0
). Instead of a GROUP..ALL
, do a COGROUP of one_line on a constant value 1
. Since there is exactly one possible value for the group key, there will be exactly one row in the output.
-- Does work, using "1\t0" table one_line = LOAD '/data/gold/one_line.tsv' AS (uno:int, zilch:int); -- will be `1` (true) major_equals_major = FOREACH (COGROUP one_line BY 1, major_xor_major BY 1) GENERATE (COUNT_STAR(major_xor_major) == 0L ? 1 : 0) AS is_equal; -- will be `0` (false) major_equals_parks = FOREACH (COGROUP one_line BY 1, major_xor_parks BY 1) GENERATE (COUNT_STAR(major_xor_parks) == 0L ? 1 : 0) AS is_equal;
To demonstrate set operations on grouped records, let’s look at the year-to-year churn of mainstay players on each team.
Other applications of the procedure we follow here would include analyzing how the top-10 products on a website change over time, or identifying sensors that report values over threshold in N consecutive hours (by using an N-way COGROUP).
To construct a sequence of sets, perform a self-COGROUP
that collects the elements from each sequence key into one bag and the elements from the next key into another bag. Here, we group together the roster of players for a team’s season (that is, players with a particular team_id
and year_id
) together with the roster of players from the following season (players with the same team_id
and the subsequent year_id
).
Since it’s a self-COGROUP
, we must do a dummy projection to make new aliases (see the earlier section on self-join for details).
sig_seasons = FILTER bat_seasons BY ((year_id >= 1900) AND (lg_id == 'NL' OR lg_id == 'AL') AND (PA >= 450)); y1 = FOREACH sig_seasons GENERATE player_id, team_id, year_id; y2 = FOREACH sig_seasons GENERATE player_id, team_id, year_id; -- Put each team of players in context with the next year's team of players year_to_year_players = COGROUP y1 BY (team_id, year_id), y2 BY (team_id, year_id-1) ; -- Clear away the grouped-on fields rosters = FOREACH year_to_year_players GENERATE group.team_id AS team_id, group.year_id AS year_id, y1.player_id AS pl1, y2.player_id AS pl2 ; -- The first and last years of existence don't have anything interesting to compare rosters = FILTER rosters BY (COUNT_STAR(pl1) == 0L OR COUNT_STAR(pl2) == 0L);
The content of rosters
is a table with two key columns: team and year; and two bags: the set of players from that year and the set of players from the following year.
Applying the set operations lets us describe the evolution of the team from year to year.
DEFINE SetUnion datafu.pig.sets.SetUnion(); DEFINE SetIntersect datafu.pig.sets.SetIntersect(); DEFINE SetDifference datafu.pig.sets.SetDifference(); roster_changes_y2y = FOREACH rosters { -- Distinct Union (doesn't need pre-sorting) either_year = SetUnion(pl1, pl2); -- The other operations require sorted bags. pl1_o = ORDER pl1 BY player_id; pl2_o = ORDER pl2 BY player_id; -- Set Intersection stayed = SetIntersect(pl1_o, pl2_o); -- Set Difference y1_departed = SetDifference(pl1_o, pl2_o); y2_arrived = SetDifference(pl2_o, pl1_o); -- Symmetric Difference non_stayed = SetUnion(y1_departed, y2_arrived); -- Set Equality is_equal = ( (COUNT_STAR(non_stayed) == 0L) ? 1 : 0); GENERATE year_id, team_id, either_year, stayed, y1_departed, y2_arrived, non_stayed, is_equal; };
The Distinct Union, A union B, describes players on the roster in either year of our two-year span. We’ll find it using the DataFu SetUnion
UDF.
either_year = SetUnion(pl1, pl2);
All the DataFu set operations here tolerate inputs containing duplicates, and all of them return bags that contain no duplicates. They also each accept two or more bags, enabling you to track sequences longer than two adjacent elements.
As opposed to SetUnion, the other set operations require sorted inputs. That’s not as big a deal as if we were operating on a full table, since a nested ORDER BY makes use of Hadoop’s secondary sort. As long as the input and output bags fit efficiently in memory, these operations are efficient.
pl1_o = ORDER pl1 BY player_id; pl2_o = ORDER pl2 BY player_id;
The Set Intersection (A intersect B) describes the players that played in the first year and also stayed to play in the second year. We’ll find the set intersection using the DataFu SetIntersect
UDF.
stayed = SetIntersect(pl1_o, pl2_o);
The Set Difference (A minus B) contains the elements in the first bag that are not present in the remaining bags. The first line therefore describes players that did not stay for the next year, and the second describes players that newly arrived in the next year. The DataFu SetDifference
UDF comes in handy:
y1_departed = SetDifference(pl1_o, pl2_o); y2_arrived = SetDifference(pl2_o, pl1_o);
The Symmetric Difference contains all elements that are in one set or the other but not both. You can find this using either (A minus B) union (B minus A)
— players who either departed after the first year or newly arrived in the next year — or A union B) minus (A intersect B
— players who were present in either season but not both seasons.
non_stayed = SetUnion(y1_departed, y2_arrived);
Set Equality indicates whether the elements of each set are identical — here, it selects seasons where the core set of players remained the same. There’s no direct function for set equality, but you can repurpose any of the set operations to serve.
If A and B each have no duplicate records, then A and B are equal if and only if
-
size(A) == size(B) AND size(A union B) == size(A)
-
size(A) == size(B) AND size(A intersect B) == size(A)
-
size(A) == size(B) AND size(A minus B) == 0
-
size(symmetric difference(A,B)) == 0
For multiple sets of distinct elements, A, B, C…
are equal if and only if all the sets and their intersection have the same size:
size(intersect(A,B,C,…)) == size(A) == size(B) == size© == …
If you’re already calculating one of the functions, use the test that reuses its result. Otherwise, prefer the A minus B test if most rows will have equal sets, and the A intersect B test if most will not or if there are multiple sets.
is_equal = ( (COUNT_STAR(non_stayed) == 0L) ? 1 : 0);
That wraps our chapter on Uniquing and Set Operations. We’ve started with simple definitions of unique and distinct, showed you how to make relations and then groups unique, how to find unique records and finally took a tour of set operations for both relations and between groups. With these tools in hand you can work with unique sets of values to create ontologies, tags, and curated datasets. You can also use the set operations to combine different datasets with complex semantics - beyond the simple joins we covered in Chapter 6.
We started with the basics: LOAD
, STORE
, LIMIT
, DESCRIBE
. Then we showed you map-only operations where you learned to FILTER
, SPLIT
, and FOREACH
/GENERATE
. Then we introduced grouping via the GROUP BY
operation. Next we showed you how to JOIN
and COGROUP
. Next we covered sorting and introduces ORDER BY
. Finally we taught you DISTINCT
and set operations.
This completes our presentation of analytic patterns in Pig. Congratulations! By now you should have an able toolkit of techniques fit for attacking any data-processing problem you encounter. You may use this book as a reference as you go forth and process data at scale.