Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

During training, a certain proportion of middle-layer nodes from the … #3

Merged
merged 4 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/models/tdm.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ model_config {
- layer_num_sample: 每个样本在树每层的负采样个数
- item_id_field: item_id列名
- attr_delimiter: 目前是以","写表的, 因此需设置为","
- remain_ratio: (可选, 默认为1.0)训练时每个batch内在树中随机选取一定比例的非叶节点层的节点训练, 可以一定程度防止过拟合
- probability_type: (可选, 默认为"UNIFORM")随机选取树中非叶节点层时, 每层被选中的概率, 目前可选择"UNIFORM", "ARITHMETIC", "RECIPROCAL"
- "UNIFORM": 每层被选中的概率相同
- "ARTITHMETIC": 每层被选中概率等差递增
- "RECIPROCAL": 每层被选中的概率呈反比例函数递增, 即p正比于1/(TREE_LEVEL - CUR_LEVEL)

### 示例Config

Expand Down
59 changes: 53 additions & 6 deletions tzrec/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,73 @@ def _expand_tdm_sample(
sampler_config = getattr(data_config, sampler_type)
tree_level = len(sampler_config.layer_num_sample)
num_all_layer_neg = sum(sampler_config.layer_num_sample)
layer_num_sample = sampler_config.layer_num_sample

item_fea_names = pos_sampled.keys()
all_fea_names = input_data.keys()
label_fields = set(data_config.label_fields)
user_fea_names = all_fea_names - item_fea_names - label_fields

remain_ratio = sampler_config.remain_ratio
tiankongdeguiji marked this conversation as resolved.
Show resolved Hide resolved
if remain_ratio < 1.0:
probability_type = sampler_config.probabilty_type
if probability_type == "UNIFORM":
p = np.array([1 / (tree_level - 1)] * (tree_level - 1))
elif probability_type == "ARITHMETIC":
p = np.arange(1, tree_level) / sum(np.arange(1, tree_level))
elif probability_type == "RECIPROCAL":
p = 1 / np.arange(tree_level - 1, 0, -1)
p = p / sum(p)
else:
raise ValueError(
f"probability_type: [{probability_type}]" "is not supported now."
)
remain_layer = np.random.choice(
range(tree_level - 1),
int(remain_ratio * (tree_level - 1)),
replace=False,
p=p,
)
remain_layer.sort()
else:
remain_layer = list(range(tree_level - 1))

num_remain_layer_neg = (
sum([layer_num_sample[i] for i in remain_layer]) + layer_num_sample[-1]
)
for item_fea_name in item_fea_names:
batch_size = len(input_data[item_fea_name])
pos_index = (
(remain_layer[None, :] + (tree_level - 1) * np.arange(batch_size)[:, None])
.flatten()
.astype(np.int64)
)
neg_index = (
(
np.concatenate(
[
range(sum(layer_num_sample[:i]), sum(layer_num_sample[: i + 1]))
for i in np.append(remain_layer, tree_level - 1)
]
)[None, :]
+ num_all_layer_neg * np.arange(batch_size)[:, None]
)
.flatten()
.astype(np.int64)
)
input_data[item_fea_name] = pa.concat_arrays(
[
input_data[item_fea_name],
pos_sampled[item_fea_name],
neg_sampled[item_fea_name],
pos_sampled[item_fea_name].take(pos_index),
neg_sampled[item_fea_name].take(neg_index),
]
)

# In the sampling results, the sampled outcomes for each item are contiguous.
for user_fea_name in user_fea_names:
user_fea = input_data[user_fea_name]
pos_index = np.repeat(np.arange(len(user_fea)), tree_level - 1)
neg_index = np.repeat(np.arange(len(user_fea)), num_all_layer_neg)
pos_index = np.repeat(np.arange(len(user_fea)), len(remain_layer))
neg_index = np.repeat(np.arange(len(user_fea)), num_remain_layer_neg)
pos_expand_user_fea = user_fea.take(pos_index)
neg_expand_user_fea = user_fea.take(neg_index)
input_data[user_fea_name] = pa.concat_arrays(
Expand All @@ -134,11 +181,11 @@ def _expand_tdm_sample(
[
input_data[label_field].cast(pa.int64()),
pa.array(
[1] * (len(input_data[label_field]) * (tree_level - 1)),
[1] * (len(input_data[label_field]) * len(remain_layer)),
type=pa.int64(),
),
pa.array(
[0] * (len(input_data[label_field]) * num_all_layer_neg),
[0] * (len(input_data[label_field]) * num_remain_layer_neg),
type=pa.int64(),
),
]
Expand Down
114 changes: 114 additions & 0 deletions tzrec/datasets/dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.


import math
import tempfile
import unittest
from typing import Any, Dict, Iterator, List, Optional
Expand Down Expand Up @@ -426,6 +427,119 @@ def test_dataset_predict_mode(self, debug_level):
else:
self.assertEqual(list(batch.reserves.get().column_names), ["label"])

def test_dataset_with_tdm_sampler_and_remain_ratio(self):
node = tempfile.NamedTemporaryFile("w")
self._temp_files.append(node)
node.write("id:int64\tweight:float\tattrs:string\n")
for i in range(63):
node.write(f"{i}\t{1}\t{int(math.log(i+1,2))}:{i}:{i+1000}:我们{i}\n")
node.flush()

def _ancesstor(code):
ancs = []
while code > 0:
code = int((code - 1) / 2)
ancs.append(code)
return ancs

edge = tempfile.NamedTemporaryFile("w")
self._temp_files.append(edge)
edge.write("src_id:int64\tdst_id:int\tweight:float\n")
for i in range(31, 63):
for anc in _ancesstor(i):
edge.write(f"{i}\t{anc}\t{1.0}\n")
edge.flush()

def _childern(code):
return [2 * code + 1, 2 * code + 2]

predict_edge = tempfile.NamedTemporaryFile("w")
self._temp_files.append(predict_edge)
predict_edge.write("src_id:int64\tdst_id:int\tweight:float\n")
for i in range(7, 15):
predict_edge.write(f"0\t{i}\t{1}\n")
for i in range(7, 31):
for child in _childern(i):
predict_edge.write(f"{i}\t{child}\t{1}\n")
predict_edge.flush()

input_fields = [
pa.field(name="int_a", type=pa.int64()),
pa.field(name="float_b", type=pa.float64()),
pa.field(name="str_c", type=pa.string()),
pa.field(name="int_d", type=pa.int64()),
pa.field(name="float_d", type=pa.float64()),
pa.field(name="label", type=pa.int32()),
]
feature_cfgs = [
feature_pb2.FeatureConfig(
id_feature=feature_pb2.IdFeature(feature_name="int_a")
),
feature_pb2.FeatureConfig(
id_feature=feature_pb2.IdFeature(feature_name="str_c")
),
feature_pb2.FeatureConfig(
raw_feature=feature_pb2.RawFeature(feature_name="float_b")
),
feature_pb2.FeatureConfig(
id_feature=feature_pb2.IdFeature(feature_name="int_d")
),
feature_pb2.FeatureConfig(
raw_feature=feature_pb2.RawFeature(feature_name="float_d")
),
]
features = create_features(
feature_cfgs, neg_fields=["int_a", "float_b", "str_c"]
)

dataset = _TestDataset(
data_config=data_pb2.DataConfig(
batch_size=4,
dataset_type=data_pb2.DatasetType.OdpsDataset,
fg_encoded=True,
label_fields=["label"],
negative_sampler=sampler_pb2.TDMSampler(
input_input_path=node.name,
edge_input_path=edge.name,
predict_edge_input_path=predict_edge.name,
attr_fields=["tree_level", "int_a", "float_b", "str_c"],
item_id_field="int_a",
layer_num_sample=[1, 1, 1, 1, 1, 5],
field_delimiter=",",
remain_ratio=0.4,
probability_type="UNIFORM",
),
),
features=features,
input_path="",
input_fields=input_fields,
)

dataset.launch_sampler_cluster(2)
dataloader = DataLoader(
dataset=dataset,
batch_size=None,
num_workers=2,
pin_memory=True,
collate_fn=lambda x: x,
)

iterator = iter(dataloader)
batch = next(iterator)
self.assertEqual(
batch.dense_features[BASE_DATA_GROUP].keys(), ["float_b", "float_d"]
)
self.assertEqual(batch.dense_features[BASE_DATA_GROUP].values().size(), (40, 2))
self.assertEqual(
batch.sparse_features[BASE_DATA_GROUP].keys(),
["int_a", "str_c", "int_d"],
)
self.assertEqual(batch.sparse_features[BASE_DATA_GROUP].values().size(), (120,))
self.assertEqual(
batch.sparse_features[BASE_DATA_GROUP].lengths().size(), (120,)
)
self.assertEqual(batch.labels["label"].size(), (40,))


if __name__ == "__main__":
unittest.main()
6 changes: 6 additions & 0 deletions tzrec/protos/sampler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,10 @@ message TDMSampler {
optional uint32 num_eval_sample = 8 [default=0];
// field delimiter of input data
optional string field_delimiter = 9;
// The training process only trains a randomly selected
// proportion of nodes in the middle layers of the tree
optional float remain_ratio = 10 [default=1.0];
// The type of probability for selecting and retaining
// each layer in the middle layers of the tree
optional string probabilty_type = 11 [default="UNIFORM"];
}
Loading