Skip to content

Internal

Yuzhen Huang edited this page Apr 24, 2019 · 1 revision

This document contains some low-level implementation documents. The contents are written during the development of the first version.

Normal Tangram users can ignore this document.

Checkpoint

Users can specify checkpoint interval and path in a plan using ->SetCheckpointInterval(interval, path) in the plan configuration.

Two types of checkpointing methods.

Sync checkpoint

In sync checkpoint, a version will be completed when each partition has been successfully checkpointed.

In this mode, when a partition's join version is updated, it will check whether it needs checkpoint. If it needs, it will checkpoint and after the checkpoint, the control_manager will be notified.

It can be easily observed that the iteration conducting checkpoint takes more time to finish.

Background checkpoint, BGCP

The checkpoint will be conducting in the background.

The cost of checkpointing will be amortized by later iteration (checkpointing will consume disk and network bandwidth).

Problems: It is also possible that a checkpoint takes much longer to finish than a checkpoint interval. Because of the delay, multiple checkpointing may be running.

Fault Tolerance

Fault recovery.

Two types of recovery.

Some write partitions lost

Solution: Rerun the plan from the last checkpoint.

  1. Abort the current plan.
  2. Read all the write partitions (mutable) from last checkpoint.
  3. Read the read-only partitions (immutable) partially. Only the lost partitions need to be reloaded.
  4. Update the collection map.
  5. Restart a plan from last checkpoint.

Example: Pagerank.

Similar to the Pagerank recovery in Spark RDD paper. For iterative algorithm, e.g., pagerank, the lineage is long, so checkpointing is used in Spark.

Benefits: Only need to read all partitions for the write (mutable) collection and the lost partitions for the read (immutable) collection.

Limitations: The recovery needs to abort a currently running plan. In most of the case a running plan can be aborted as map task, fetch task and join task will finally finish. Also in background checkpoint, if the plan is aborted and CP is not finished, then CP would not be started. However, if a map is fetching a remote part/objs, the map task will be blocked and will never be able to be terminated!!! This is a bug to be fixed.

No write partitions lost

Solution: Reassign the lost map partitions evenly to live nodes, and rerun the lost map partitions. No need to go back to the last checkpoint.

Example: K-means, where the model collection does not have a lot partitions.

Similar to the Spark FT experiments, where the model is stored in the driver side. A failed node will not affect the model.

Limitations: All map versions for lost partitions will be reset to the min_version. Meaning that some map will be rerun. This is because, even if a map part version updates in the control_manager, the map_output may not be sent out successfully as the map_output will firstly go to the delayed_combiner.

So, it's ok for a map to be launched twice and thus the 'ignoring... ' message is normal in this case.

Commits: https://github.com/Yuzhen11/xyz/commit/e87d2a055105e6b208fa23ea581bed38d1772ea4 on 2018.4.15

Load Balance

Assuming map collection equals join collection. Other circumstances are relatively easy to cope with.

General Procedures

  1. The scheduler decides the migration scheme. If a partition is going to be migrated, firstly scheduler would tell every worker to load checkpoint from hdfs.

  2. After confirming every worker has loaded the cp, scheduler tells every worker to update the collection map.

  3. After confirmation, scheduler sends messages to start migration.

  4. All workers would send FlushAll message to the original worker once receiving the start msg from scheduler. For the origin worker, it wait until the join and fetch task for the migrating partition to finish, and send the partition and cached msgs to the new worker when receiving all the FlushAll msgs.

  5. The new worker receives the partition and cached msgs from the original worker, tries to start work for this partition and tells scheduler that migration is done.

A Special Case

There could exist an intelligent scheme to decide when and how to migrate some partitions from a slow node to another node, though we just manually migrate some specific partitions at present. Regarding the condition of the migrating partition in the original node, as which stage the partition to migrate has come to in the execution thread pool is uncertain, two methods are developed to handle different situations.

BEFORE being submitted to combiner or even the execution thread pool

If the partition to migrate has not been submitted to the combiner, we just abort it, send the partition and cached messages, and let the new node do the map task for it.

AFTER being submitted to combiner or even in the join stage

If the partition to migrate has been submitted to the combiner, which means we cannot do anything to stop it, or even finished the map task, the new node would not repetitively do the map task. The new node only processes the cached message for this partition.

A Limitation

Currently migration is not supported for delay combiner, as previously map task for the migrating partition could be repetitively executed by the original node and the new node, and repetitive map results would be then simply ignored when direct combiner other than delay combiner is used. When considering delay combiner, the problem would be that we cannot distinguish the map result of the migrating partition from the combined map result. But this should be easy to cope with in that we won't repetitively do map task for the migrating partitions now.