Yeongmin Baek
13 min read

Categories

Tags

이번 글에서는 TPU를 이용하기 위해 코드레벨에서 어떤 작업들을 수행해야 하는지 알아봅니다. 데이터를 만드는 부분부터, 실제 학습 루프까지 간단한 구현체로 살펴봅니다. Tensorflow2.x 에서는Keras api를 사용하는 것이 공식적으로 권장되면서, 전반적으로 코드 작성이 매우 간편해졌습니다. 또한 학습 환경또한 Strategy 를 이용해 단일 GPU/ 여러 GPU에 걸친 분산 학습 / TPU를 이용한 학습을 간단하게 전환할 수 있습니다. 실제로 TPU학습을 해보면서 참고했던 코드들(ALBERT, BERT, ELECTRA)을 종합하여 유용했던 내용들을 기록합니다.


목차

  1. 데이터 준비
  2. Tensorflow2.x Training Loop


1. 데이터 준비

일반적으로 TPU를 이용하여 학습을 진행할 때에는 TFRecord 형태의 데이터를 이용합니다. Tensorflow 공식문서를 참고하면 TFRecord 가 무엇이고 를 어떻게 만들수 있는지를 확인할 수 있습니다. 간단하게 설명하면, 효율적인 학습을 위해 일반적인 텍스트(*.txt), 이미지(*.jpg, *.png) 등의 데이터를 serialize하여 binary 형식 *.tfrecord 으로 저장하는 것입니다.

To read data efficiently it can be helpful to serialize your data and store it in a set of files (100-200MB each) that can each be read linearly. This is especially true if the data is being streamed over a network. This can also be useful for caching any data-preprocessing.

본격적으로 BERT의 입력으로 이용될 TFRecord를 어떻게 만드는지 알아보겠습니다. 먼저 가장 간단한 예제는 Google BERT 공식 레포에서 확인할 수 있습니다. 이 코드를 보면 Text파일을 BERT 입력형식에 맞춰진 TFRecord로 만드는 과정을 볼 수 있습니다. 과정은 크게 1) 텍스트 파일 읽기 2) 읽은 파일을 순차적으로 돌면서 BERT의 형식에 맞춘 데이터 만들기 3) TFRecord 형식를 위해 Serialize 후 저장 으로 나눌 수 있습니다. BERT의 코드를 바탕으로 하되, 다른 레포에서 볼 수 있는 유용한 방법들을 추가적으로 기록합니다.

1.1. BERT의 입력 데이터 형식

위 코드를 살펴보면 BERT 학습을 위한 입력 데이터 형식은 다음과 같이 5개의 인자로 구성됩니다.

  • tokens: 하나의 학습 인스턴스를 구성하는 토큰 들의 vocab 인덱스
  • segment_ids: BERT의 각 segment를 구분하기 위한 인덱스(0, 1)
  • is_random_next: NSP 문제의 정답(True: negative, False: positive)
  • masked_lm_positions: MLM 문제를 풀 때, Masking한 토큰의 위치
  • masked_lm_labels: MLM 문제의 정답(masked_lm_postion 에 대응하는 [MASK] 토큰의 정답)
class TrainingInstance(object):
    """A single training instance (sentence pair)."""

    def __init__(self, tokens, segment_ids, masked_lm_positions, masked_lm_labels,
                is_random_next):
        self.tokens = tokens
        self.segment_ids = segment_ids
        self.is_random_next = is_random_next
        self.masked_lm_positions = masked_lm_positions
        self.masked_lm_labels = masked_lm_labels

1.2. BERT의 입력 데이터 만들기

위의 TrainingInstance은 몇개의 간단한 로직들을 거쳐서 만들어집니다.

  1. 입력 파일들을 문서 단위로 만들고 Tokenize하기
      def create_training_instances(input_files, tokenizer, max_seq_length,
                                 dupe_factor, short_seq_prob, masked_lm_prob,
                                 max_predictions_per_seq, rng):
       """Create `TrainingInstance`s from raw text."""
       # Input file format:
       # (1) One sentence per line. These should ideally be actual sentences, not
       # entire paragraphs or arbitrary spans of text. (Because we use the
       # sentence boundaries for the "next sentence prediction" task).
       # (2) Blank lines between documents. Document boundaries are needed so
       # that the "next sentence prediction" task doesn't span between documents.
       for input_file in input_files:
           with tf.gfile.GFile(input_file, "r") as reader:
               while True:
                   line = tokenization.convert_to_unicode(reader.readline())
                   if not line:
                       break
                       line = line.strip()
    
                       # Empty lines are used as document delimiters
                       if not line:
                           all_documents.append([])
                       tokens = tokenizer.tokenize(line)
                       if tokens:
                           all_documents[-1].append(tokens)
       ...
    

    create_trainiing_instance 함수는 위와 같이 간단하게 각 입력 파일들을 읽어서 Tokenize 과정을 거칩니다. 위의 주석에서 알 수 있듯이 텍스트 파일의 한 줄에는 하나의 문장이, 다른 문서들 사이에는 빈 라인이 존재해야 합니다.

  2. 각 문서들을 이용해서 입력 인스턴스 만들기
      def create_instances_from_document(
       all_documents, document_index, max_seq_length, short_seq_prob,
       masked_lm_prob, max_predictions_per_seq, vocab_words, rng):
       """Creates `TrainingInstance`s for a single document."""
       document = all_documents[document_index]
    
       # Traget Sequence Length 정하기
       max_num_tokens = max_seq_length - 3
       target_seq_length = max_num_tokens
       if rng.random() < short_seq_prob:
         target_seq_length = rng.randint(2, max_num_tokens)
       ...
    
       while i < len(document):
         segment = document[i]
         current_chunk.append(segment)
         current_length += len(segment)
         if i == len(document) - 1 or current_length >= target_seq_length:
             # Segment A, Segment B를 구성할 tokens_a, tokens_b를 구하기
             ...
             # 두 Segment가 최대 길이를 넘지 않도록 Truncation
             truncate_seq_pair(tokens_a, tokens_b, max_num_tokens, rng)
             # 특수토큰을 포함하여 두 Segment를 합치고, 이에 따라 segment_id를 만들기
             tokens = []
             segment_ids = []
             ...
             # MLM을 위한 Masking
             (tokens, masked_lm_positions, masked_lm_labels) = create_masked_lm_predictions(
                 tokens, masked_lm_prob, max_predictions_per_seq, vocab_words, rng)
             # Training Instance 로 만들기
             instance = TrainingInstance(
                 tokens=tokens,
                 segment_ids=segment_ids,
                 is_random_next=is_random_next,
                 masked_lm_positions=masked_lm_positions,
                 masked_lm_labels=masked_lm_labels)
             instances.append(instance)
           current_chunk = []
           current_length = 0
         i += 1
    
       return instances
    

    create_instances_from_document 각 문서들을 입력으로 받아서 BERT입력을 만드는 과정을 수행합니다. 세부적인 코드를 모두 들고오면 양이 너무 많기 때문에 중간중간 주석으로 각 기능을 대체했습니다.

  3. TFRecord 형식으로 파일 쓰기
     def write_instance_to_example_files(instances, tokenizer, max_seq_length,
                                     max_predictions_per_seq, output_files):
         """Create TF example files from `TrainingInstance`s."""
         writers = []
         for output_file in output_files:
           writers.append(tf.python_io.TFRecordWriter(output_file))
    
         for (inst_index, instance) in enumerate(instances):
           # 각 인스턴스들을 Int64List의 고정된 길이의 형태로 변환하기
           features = collections.OrderedDict()
           features["input_ids"] = create_int_feature(input_ids)
           features["input_mask"] = create_int_feature(input_mask)
           features["segment_ids"] = create_int_feature(segment_ids)
           features["masked_lm_positions"] = create_int_feature(masked_lm_positions)
           features["masked_lm_ids"] = create_int_feature(masked_lm_ids)
           features["masked_lm_weights"] = create_float_feature(masked_lm_weights)
           features["next_sentence_labels"] = create_int_feature([next_sentence_label])
    
           # tf.train.Example로 변환하기
           tf_example = tf.train.Example(features=tf.train.Features(feature=features))
           writers[writer_index].write(tf_example.SerializeToString())
           writer_index = (writer_index + 1) % len(writers)
    
           total_written += 1
           ...
    
     def create_int_feature(values):
         feature = tf.train.Feature(int64_list=tf.train.Int64List(value=list(values)))
         return feature
    

    위과정을 통해 각 인스턴스들을 tensorflow의 tf.train.Feature, tf.train.Example 을 거쳐 Serialize한 후 저장합니다.

1.3. Static vs Dynamic Masking

BERT의 공식 구현체에서는 위와 같이 구성된 학습 인스턴스를 이용하지만, 이렇게 하면 데이터를 만드는 시점에서 Masking을 진행할 토큰들, SegmentA, SegmentB가 정해지게 됩니다. 따라서 학습 진행 시 매 epoch마다 동일한 Segment 쌍 및 Masking을 보게 됩니다.(Static) BERT 구현체에서는 이를 해결하기 위해 같은 데이터로 랜덤성을 달리하여 n개의 학습 인스턴스를 만들어 이용합니다.(학습 데이터의 N배 용량을 가지게 됩니다.) 또 다른 구현체들(ELECTRA, ALBERT)의 경우 Tensorflow graph상에서 Masking을 구현하여 마스킹이되지 않은 토큰들을 입력으로 받아 이를 GPU/TPU 상에서 마스킹합니다. (Dynamic) 즉, 동일 문장이라도 매번 Masking되는 토큰이 달라집니다. 추가적으로 ELECTRA Dynamic Masking 방법을 알아봅니다.

def mask(config: configure_pretraining.PretrainingConfig,
         inputs: pretrain_data.Inputs, mask_prob, proposal_distribution=1.0,
         disallow_from_mask=None, already_masked=None):
  """Implementation of dynamic masking. The optional arguments aren't needed for
  BERT/ELECTRA and are from early experiments in "strategically" masking out
  tokens instead of uniformly at random.
  Args:
    config: configure_pretraining.PretrainingConfig
    inputs: pretrain_data.Inputs containing input input_ids/input_mask
    mask_prob: percent of tokens to mask
    proposal_distribution: for non-uniform masking can be a [B, L] tensor
                           of scores for masking each position.
    disallow_from_mask: a boolean tensor of [B, L] of positions that should
                        not be masked out
    already_masked: a boolean tensor of [B, N] of already masked-out tokens
                    for multiple rounds of masking
  Returns: a pretrain_data.Inputs with masking added
  """

6개의 인자를 입력으로 받습니다.

  • config: 각종 학습 config가 들어있습니다.
  • inputs: Masking을 진행할 입력이며, input_ids, input_mask(attention mask)로 구성됩니다.
  • mask_prob: Masking 확률입니다.
  • proposal_distribution: 각 토큰의 위치마다 Masking확률의 분포입니다. (기본적으로는 모두 동일하게 설정 = 1입니다.)
  • disallow_from_mask: Masking을 진행하면 안되는 특수 토큰, Padding등의 위치입니다.
  • already_masked: 이미 이전 epoch에서 마스킹이 되었던 토큰들입니다.(이전 epoch과 다른 masking을 하기위해)
  # Get the batch size, sequence length, and max masked-out tokens
  N = config.max_predictions_per_seq
  B, L = modeling.get_shape_list(inputs.input_ids)

  # Find indices where masking out a token is allowed
  vocab = tokenization.FullTokenizer(
      config.vocab_file, do_lower_case=config.do_lower_case).vocab
  candidates_mask = _get_candidates_mask(inputs, vocab, disallow_from_mask)

  # Set the number of tokens to mask out per example
  num_tokens = tf.cast(tf.reduce_sum(inputs.input_mask, -1), tf.float32)
  num_to_predict = tf.maximum(1, tf.minimum(
      N, tf.cast(tf.round(num_tokens * mask_prob), tf.int32)))
  masked_lm_weights = tf.cast(tf.sequence_mask(num_to_predict, N), tf.float32)
  if already_masked is not None:
    masked_lm_weights *= (1 - already_masked)

위의 과정과 같이 inputs.input_mask를 이용해 현재 입력에서 패딩을 제외한 실제 토큰들이 몇 개인지 계산(num_tokens)하고, 이 결과와 마스킹 확률값을 이용해 [MASK]토큰의 갯수를 계산(num_to_predict)합니다.

  # Get a probability of masking each position in the sequence
  candidate_mask_float = tf.cast(candidates_mask, tf.float32)
  sample_prob = (proposal_distribution * candidate_mask_float)
  sample_prob /= tf.reduce_sum(sample_prob, axis=-1, keepdims=True)

  # Sample the positions to mask out
  sample_prob = tf.stop_gradient(sample_prob)
  sample_logits = tf.log(sample_prob)
  masked_lm_positions = tf.random.categorical(
      sample_logits, N, dtype=tf.int32)
  masked_lm_positions *= tf.cast(masked_lm_weights, tf.int32)

  # Get the ids of the masked-out tokens
  shift = tf.expand_dims(L * tf.range(B), -1)
  flat_positions = tf.reshape(masked_lm_positions + shift, [-1, 1])
  masked_lm_ids = tf.gather_nd(tf.reshape(inputs.input_ids, [-1]),
                               flat_positions)
  masked_lm_ids = tf.reshape(masked_lm_ids, [B, -1])
  masked_lm_ids *= tf.cast(masked_lm_weights, tf.int32)

각 토큰들을 셈플링할 확률(sample_prob)을 구하고 tf.random.categorical() 함수를 이용하여 실제로 Masking할 토큰들을 뽑습니다. 이후에 이 label로 이용하기 위해 뽑혀진 토큰들의 id들을 저장합니다.(masked_lm_ids)

  # Update the input ids
  replace_with_mask_positions = masked_lm_positions * tf.cast(
      tf.less(tf.random.uniform([B, N]), 0.85), tf.int32)
  inputs_ids, _ = scatter_update(
      inputs.input_ids, tf.fill([B, N], vocab["[MASK]"]),
      replace_with_mask_positions)

  return pretrain_data.get_updated_inputs(
      inputs,
      input_ids=tf.stop_gradient(inputs_ids),
      masked_lm_positions=masked_lm_positions,
      masked_lm_ids=masked_lm_ids,
      masked_lm_weights=masked_lm_weights
  )

BERT의 설정과 유사한 마스킹 전략을 가져가기 위해 마스킹할 토큰 중 85%의 토큰을 [MASK] 토큰으로 치환하여 input_ids를 만들고 나머지 15%는 동일 토큰으로 유지합니다.


2. Tensorflow2.x Training Loop

Tensorflow2.x 버전에서 간단하게 학습을 할 수 있는 스크립트를 알아봅니다.

2.1. Strategy

먼저 tf.distritubted.Strategy에 대해 알아봅니다. TF 공식 문서를 보면 Strategy가 무엇인지 잘 나와있습니다. 간단하게는 여러 개의 GPU, TPU를 이용해서 분산 학습을 코드의 변화 없이 수행하기 위해서 이용하는 API입니다. 지원되는 학습 환경은 이곳에서 확인할 수 있으며, OneDeviceStrategy 도 지원하기 때문에 사실상 단일 GPU 학습환경도 지원합니다.

tf.distribute.Strategy is a TensorFlow API to distribute training across multiple GPUs, multiple machines or TPUs. Using this API, you can distribute your existing models and training code with minimal code changes.

Tensorflow2.1 버전 기준으로 각 설정에 따라 Strategy를 얻을 수 있는 코드는 다음과 같습니다.

def select_strategy(config: Config) -> tf.distribute.Strategy:
    """
    Configuration을 바탕으로 Strategy를 설정합니다.
    :param config: Training / Inference Config
    :returns: tf.distribute.Strategy
    """
    if config.device == "GPU":
        devices: List[tf.config.PhysicalDevice] = tf.config.list_physical_devices("GPU")
        if len(devices) == 0:
            raise RuntimeError("GPU를 찾지 못했습니다. 혹시 CUDA_VISIBLE_DEVICE를 제대로 설정하셨나요?")
        if len(devices) > 1:
            strategy = tf.distribute.MirroredStrategy()
        else:
            strategy = tf.distribute.OneDeviceStrategy("/gpu:0")
    elif config.device == "TPU":
        resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=os.environ["TPU_NAME"])
        tf.config.experimental_connect_to_cluster(resolver)
        tf.tpu.experimental.initialize_tpu_system(resolver)
        strategy = tf.distribute.TPUStrategy(resolver)
    else:
        raise ValueError(f"{config.device}는 지원되지 않는 기기입니다.")
    return strategy

위 과정을 통해 GPU가 2개 이상인 경우, MirroredStrategy() (distributed data parallel)로, 1개인 경우 OneDeviceStrategy()로 TPU인 경우 TPUStrategy()로 동작합니다. 위 코드에서 TPU의 경우 추가적으로 TPU_NAME이라는 환경변수가 필요한데, 이는 콘솔에서 TPU 노드를 생성할 때 설정한 이름으로 python script를 실행할 때 환경변수로 넘겨주는 방식 등으로 사용할 수 있습니다. (TPU_NAME=node_1 python run_train.py)

2.2. Dataset

위의 1번과정에서 생성한 TFRecord를 읽어서 실제 학습에 이용할 수 있는 Tensor형태로 변환하기 위해 tf.data.Dataset의 API를 이용합니다. 자세한 내용은 이곳에서 확인할 수 있습니다. 또한 이곳에서 좋은 성능의 인풋 파이프라인을 만드는 법을 확인할 수 있습니다. 먼저 TFRecord를 읽어서 tensor형태로 파싱하는 함수는 다음과 같습니다. 이 코드는 ALBERT를 참고했습니다.

def parse_function(example_proto, name_to_features):
    example = tf.io.parse_single_example(example_proto, name_to_features)
    for name in list(example.keys()):
        t = example[name]
        if t.dtype == tf.int64:
            t = tf.cast(t, tf.int32)
        if name == "attention_mask":
            t = tf.cast(t, tf.float32)
        example[name] = t
    return example

각 example을 읽어서 serialize 했던 데이터들을 다시 파싱한 후 형 변환을 해줍니다.

def build_interleaved_tfrecord_dataset(
    tfrecord_paths: List[str], max_sequence_length: int, batch_size: int, num_cpu_threads: int
):
    dataset = tf.data.Dataset.from_tensor_slices(tf.constant(tfrecord_paths))
    dataset = dataset.shuffle(buffer_size=len(tfrecord_paths))
    dataset = dataset.repeat()

    cycle_length = min(num_cpu_threads, len(tfrecord_paths))

    dataset = dataset.interleave(
        tf.data.TFRecordDataset,
        cycle_length=cycle_length,
        block_length=num_cpu_threads * 4,
        num_parallel_calls=tf.data.experimental.AUTOTUNE,
    )
    dataset = dataset.shuffle(buffer_size=10)

    name_to_features = {
        "input_ids": tf.io.FixedLenFeature([max_sequence_length], tf.int64),
        "attention_mask": tf.io.FixedLenFeature([max_sequence_length], tf.float32),
        "segment_ids": tf.io.FixedLenFeature([max_sequence_length], tf.int64),
        "label": tf.io.FixedLenFeature([], tf.int64),
    }

    feature_parse_fn = partial(parse_function, name_to_features=name_to_features)
    dataset = dataset.map(feature_parse_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.batch(batch_size, drop_remainder=True)
    return dataset

각 입력 파일들의 경로를 입력으로 받아 이를 섞어서 병렬적으로 interleaving 해줍니다. 이 때, cycle_length는 주어진 계산식 보다 크게 가져갈 수 있고, 상황에 따라 변화시켜가며 실험을 통해 얻을 수 있습니다. pre-training에서는 epoch단위 보다 total step을 정해두고 이를 초과한 경우 training loop를 멈추려고 하기 때문에 계속해서 반복하는 dataset.repeat() 을 수행합니다. 결과적으로 읽혀진 데이터셋에 위에서 정의한 parse_function()을 매핑하고 .batch() method로 batching을 진행합니다.

2.2. Training Loop

가장 간단한 Training Loop은 이곳을 참고할 수 있습니다. 위 링크에 따르면, 1) Keras API를 이용한 모델, 옵티마이저를 선언 2) tf.data.Dataset의 API를 이용한 데이터셋의 선언 3) 학습 및 평가 정도로 요약할 수 있습니다. 여기서 추가적으로 Strategy를 이용하기 위해 위의 모든 과정을 Strategy.scope()의 컨텍스트 내부에 선언해야 하고 몇몇 수정이 필요합니다.

from transformers import TFBertForPreTraining, BertConfig

strategy = select_strategy(config)
with strategy.scope():
    # 모델 선언
    model_config = BertConfig()
    model = TFBertForPreTraining(model_config)

huggingface의 transformers를 이용하여 모델을 초기화합니다.

    # 옵티마이저 선언
    # 실제 BERT는 AdamW를 이용하지만 코드의 편의를 위해 Adam을 사용합니다.
    # AdamW는 tf.addons(https://www.tensorflow.org/addons/overview)에 구현되어 있습니다.
    optimizer = tf.keras.optimizers.Adam(learning_rate=config.learning_rate)

    # Learning rate scheduler는 생략합니다.

keras의 optimizer를 이용하여 learning_rate에 맞는 옵티마이저를 선언합니다.

    # 데이터셋 선언
    dataset = build_interleaved_tfrecord_dataset(
        config.training_file_paths, config.max_sequence_length, batch_size, config.num_cpu_threads
    )
    dist_dataset = strategy.experimental_distribute_dataset(dataset)

위에서 정의한 함수를 이용한 데이터셋을 선언합니다. 분산 학습을 위해 strategy.experimental_distribute_dataset() method를 이용합니다.

    # 실제 학습 Loop 선언
    @tf.function
    def train_step(dist_batch):
        def step_fn(batch):
            with tf.GradientTape() as tape:
                model_output = model(
                    [masked_input_ids, batch["attention_mask"], batch["segment_ids"], batch["turn_ids"]],
                    training=True,
                )
                mlm_output = mlm_loss_fn(
                    config.vocab_size, model_output[0], masked_lm_positions, masked_lm_ids, masked_lm_weights
                )
                seq_relationship_output = seq_relationship_loss_fn(
                    config.num_seq_relationship_class, model_output[1], batch["label"]
                )
                total_loss += seq_relationship_output[0]
                total_loss *= 1.0 / strategy.num_replicas_in_sync
                # Metric logging 등의 과정은 생략합니다.

            training_vars = model.trainable_variables
            gradients = tape.gradient(total_loss, training_vars)
            optimizer.apply_gradients(zip(gradients, training_vars))

            return total_loss

        strategy.experimental_run_v2(step_fn, args=(dist_batch,))

위와 같이 실제 학습에서 하나의 스텝에 해당하는 함수를 작성합니다. 대부분의 과정은 위의 튜토리얼 링크와 유사합니다. 하지만 다음과 같이 몇 가지 차이점이 있습니다.

  1. Tensorflow2.x는 eager execution이 기본 옵션이기 때문에, tf.function를 이용해 이를 graph상에서 연산하도록 해주어야 합니다.
  2. strategy.num_replicas_in_sync는 모델이 복사되어있는 수(일반적으로 GPU 수) 인데 최종 loss에서 이를 나눠줌으로써, 싱글 GPU와 동일하게 스케일을 조절할 수 있습니다.
  3. 마지막에 strategy.experimental_run_v2() method를 이용하여 배치를 각 GPU에 분배하고 이를 이용해 학습을 진행합니다.
    for step, dist_batch in enumerate(dist_dataset):
        train_step(dist_batch)

최종적으로 data_loader를 돌면서 학습을 진행합니다.


여기까지 학습을 돌리기 위한 예제들을 대략적으로 살펴보았습니다. 위 예제들은 편의상 많은 부분들이 생략되어 있습니다. 시간이 나는대로 곧 처음부터 끝까지 코드를 정리해서 github에 올릴 예정입니다. 다음 글에서는 실제로 TPU에서 학습을 진행하면서 겪었던 문제점들에 대해 정리해보겠습니다.


Reference