Skip to content

Commit

Permalink
Refactor the Autosubmit PullRequestCheck class so it can support mult…
Browse files Browse the repository at this point in the history
…iple methods (#2921)

The goal of this refactor is to made the coming work of separating the revert class much easier. This refactor will allow us to separate the checks for pull requests and revert pull requests to separate endpoints within autosubmit before ultimately separating them to individual services.

Add the revert queue topic and subscription and make several of the static pubsub variables in the Config class non static as getters because these fields are not inheritable.

*List which issues are fixed by this PR. You must list at least one issue.*
Part of flutter/flutter#113867

*If you had to change anything in the [flutter/tests] repo, include a link to the migration guide as per the [breaking change policy].*
  • Loading branch information
ricardoamador authored Jul 11, 2023
1 parent 2859b12 commit f1216a8
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 95 deletions.
95 changes: 9 additions & 86 deletions auto_submit/lib/requests/check_pull_request.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,108 +3,31 @@
// found in the LICENSE file.

import 'dart:async';
import 'dart:convert';

import 'package:auto_submit/requests/check_request.dart';
import 'package:auto_submit/service/approver_service.dart';
import 'package:auto_submit/service/config.dart';
import 'package:github/github.dart';
import 'package:googleapis/pubsub/v1.dart' as pub;
import 'package:shelf/shelf.dart';
import '../service/validation_service.dart';

import '../request_handling/pubsub.dart';

import '../service/log.dart';
import '../server/authenticated_request_handler.dart';

/// Handler for processing pull requests with 'autosubmit' label.
///
/// For pull requests where an 'autosubmit' label was added in pubsub,
/// check if the pull request is mergable.
class CheckPullRequest extends AuthenticatedRequestHandler {
class CheckPullRequest extends CheckRequest {
const CheckPullRequest({
required super.config,
required super.cronAuthProvider,
this.approverProvider = ApproverService.defaultProvider,
this.pubsub = const PubSub(),
super.approverProvider = ApproverService.defaultProvider,
super.pubsub = const PubSub(),
});

final PubSub pubsub;
final ApproverServiceProvider approverProvider;

static const int kPullMesssageBatchSize = 100;
static const int kPubsubPullNumber = 5;

@override
Future<Response> get() async {
final Set<int> processingLog = <int>{};
final List<pub.ReceivedMessage> messageList = await pullMessages();
if (messageList.isEmpty) {
log.info('No messages are pulled.');
return Response.ok('No messages are pulled.');
}

log.info('Processing ${messageList.length} messages');
final ValidationService validationService = ValidationService(config);
final List<Future<void>> futures = <Future<void>>[];

for (pub.ReceivedMessage message in messageList) {
final String messageData = message.message!.data!;

final rawBody = json.decode(String.fromCharCodes(base64.decode(messageData))) as Map<String, dynamic>;
final PullRequest pullRequest = PullRequest.fromJson(rawBody);

log.info('Processing message ackId: ${message.ackId}');
log.info('Processing mesageId: ${message.message!.messageId}');
log.info('Processing PR: $rawBody');
if (processingLog.contains(pullRequest.number)) {
// Ack duplicate.
log.info('Ack the duplicated message : ${message.ackId!}.');
await pubsub.acknowledge(
Config.pubsubPullRequestSubscription,
message.ackId!,
);

continue;
} else {
final ApproverService approver = approverProvider(config);
log.info('Checking auto approval of pull request: $rawBody');
await approver.autoApproval(pullRequest);
processingLog.add(pullRequest.number!);
}

futures.add(
validationService.processMessage(
pullRequest,
message.ackId!,
pubsub,
),
);
}
await Future.wait(futures);
return Response.ok('Finished processing changes');
}

/// Pulls queued Pub/Sub messages.
///
/// Pub/Sub pull request API doesn't guarantee returning all messages each time. This
/// loops to pull `kPubsubPullNumber` times to try covering all queued messages.
Future<List<pub.ReceivedMessage>> pullMessages() async {
final Map<String, pub.ReceivedMessage> messageMap = <String, pub.ReceivedMessage>{};
for (int i = 0; i < kPubsubPullNumber; i++) {
final pub.PullResponse pullResponse = await pubsub.pull(
Config.pubsubPullRequestSubscription,
kPullMesssageBatchSize,
);
final List<pub.ReceivedMessage>? receivedMessages = pullResponse.receivedMessages;
if (receivedMessages == null) {
continue;
}
for (pub.ReceivedMessage message in receivedMessages) {
final String messageId = message.message!.messageId!;
messageMap[messageId] = message;
}
}
return messageMap.values.toList();
return process(
config.pubsubPullRequestSubscription,
config.kPubsubPullNumber,
config.kPullMesssageBatchSize,
);
}
}
124 changes: 124 additions & 0 deletions auto_submit/lib/requests/check_request.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:convert';

import 'package:auto_submit/server/authenticated_request_handler.dart';
import 'package:auto_submit/service/approver_service.dart';
import 'package:auto_submit/service/validation_service.dart';
import 'package:github/github.dart';
import 'package:googleapis/pubsub/v1.dart' as pub;
import 'package:shelf/shelf.dart';

import '../request_handling/pubsub.dart';

import '../service/log.dart';

abstract class CheckRequest extends AuthenticatedRequestHandler {
const CheckRequest({
required super.config,
required super.cronAuthProvider,
this.approverProvider = ApproverService.defaultProvider,
this.pubsub = const PubSub(),
});

final PubSub pubsub;
final ApproverServiceProvider approverProvider;

@override
Future<Response> get();

/// Process pull request messages from Pubsub.
Future<Response> process(
String pubSubSubscription,
int pubSubPulls,
int pubSubBatchSize,
) async {
final Set<int> processingLog = <int>{};
final List<pub.ReceivedMessage> messageList = await pullMessages(
pubSubSubscription,
pubSubPulls,
pubSubBatchSize,
);
if (messageList.isEmpty) {
log.info('No messages are pulled.');
return Response.ok('No messages are pulled.');
}

log.info('Processing ${messageList.length} messages');
// TODO (ricardoamador): This validation service will be passed in by the calling class.
final ValidationService validationService = ValidationService(config);
final List<Future<void>> futures = <Future<void>>[];

for (pub.ReceivedMessage message in messageList) {
log.info(message.toJson());
assert(message.message != null);
assert(message.message!.data != null);
final String messageData = message.message!.data!;

final Map<String, dynamic> rawBody =
json.decode(String.fromCharCodes(base64.decode(messageData))) as Map<String, dynamic>;
log.info('request raw body = $rawBody');

final PullRequest pullRequest = PullRequest.fromJson(rawBody);

log.info('Processing message ackId: ${message.ackId}');
log.info('Processing mesageId: ${message.message!.messageId}');
log.info('Processing PR: $rawBody');
if (processingLog.contains(pullRequest.number)) {
// Ack duplicate.
log.info('Ack the duplicated message : ${message.ackId!}.');
await pubsub.acknowledge(
pubSubSubscription,
message.ackId!,
);

continue;
} else {
final ApproverService approver = approverProvider(config);
log.info('Checking auto approval of pull request: $rawBody');
await approver.autoApproval(pullRequest);
processingLog.add(pullRequest.number!);
}

futures.add(
validationService.processMessage(
// pullRequestMessage,
pullRequest,
message.ackId!,
pubsub,
),
);
}
await Future.wait(futures);
return Response.ok('Finished processing changes');
}

/// Pulls queued Pub/Sub messages.
///
/// Pub/Sub pull request API doesn't guarantee returning all messages each time. This
/// loops to pull `kPubsubPullNumber` times to try covering all queued messages.
Future<List<pub.ReceivedMessage>> pullMessages(
String subscription,
int pulls,
int batchSize,
) async {
final Map<String, pub.ReceivedMessage> messageMap = <String, pub.ReceivedMessage>{};
for (int i = 0; i < pulls; i++) {
final pub.PullResponse pullResponse = await pubsub.pull(
subscription,
batchSize,
);
final List<pub.ReceivedMessage>? receivedMessages = pullResponse.receivedMessages;
if (receivedMessages == null) {
continue;
}
for (pub.ReceivedMessage message in receivedMessages) {
final String messageId = message.message!.messageId!;
messageMap[messageId] = message;
}
}
return messageMap.values.toList();
}
}
33 changes: 33 additions & 0 deletions auto_submit/lib/requests/check_revert_request.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'package:auto_submit/request_handling/pubsub.dart';
import 'package:auto_submit/requests/check_request.dart';
import 'package:auto_submit/service/approver_service.dart';
import 'package:shelf/shelf.dart';

// TODO (ricardoamador): provide implementation in https://github.com/flutter/flutter/issues/113867

/// Handler for processing pull requests with 'revert' label.
///
/// For pull requests where an 'revert' label was added in pubsub,
/// check if the revert request is mergable.
class CheckRevertRequest extends CheckRequest {
const CheckRevertRequest({
required super.config,
required super.cronAuthProvider,
super.approverProvider = ApproverService.defaultProvider,
super.pubsub = const PubSub(),
});

@override
Future<Response> get() async {
/// Currently this is unused and cannot be called.
return process(
config.pubsubRevertRequestSubscription,
config.kPubsubPullNumber,
config.kPullMesssageBatchSize,
);
}
}
2 changes: 1 addition & 1 deletion auto_submit/lib/requests/github_webhook.dart
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class GithubWebhook extends RequestHandler {

if (hasAutosubmit || hasRevertLabel) {
log.info('Found pull request with auto submit and/or revert label.');
await pubsub.publish(Config.pubsubPullRequestTopic, pullRequest);
await pubsub.publish(config.pubsubPullRequestTopic, pullRequest);
}

return Response.ok(rawBody);
Expand Down
20 changes: 13 additions & 7 deletions auto_submit/lib/service/config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,26 @@ class Config {
'DartDevtoolWorkflowBot',
};

/// Repository configuration variables
Duration get repositoryConfigurationTtl => const Duration(minutes: 10);

/// PubSub configs
int get kPullMesssageBatchSize => 100;

/// Number of Pub/Sub pull calls in each cron job run.
///
/// TODO(keyonghan): monitor and optimize this number based on response time
/// https://github.com/flutter/cocoon/pull/2035/files#r938143840.
int get kPubsubPullNumber => 5;

/// Repository configuration variables
Duration get repositoryConfigurationTtl => const Duration(minutes: 10);
static String get pubsubTopicsPrefix => 'projects/flutter-dashboard/topics';
static String get pubsubSubscriptionsPrefix => 'projects/flutter-dashboard/subscriptions';

/// PubSub configs
static const String pubsubTopicsPrefix = 'projects/flutter-dashboard/topics';
static const String pubsubSubscriptionsPrefix = 'projects/flutter-dashboard/subscriptions';
static const String pubsubPullRequestTopic = 'auto-submit-queue';
static const String pubsubPullRequestSubscription = 'auto-submit-queue-sub';
String get pubsubPullRequestTopic => 'auto-submit-queue';
String get pubsubPullRequestSubscription => 'auto-submit-queue-sub';

String get pubsubRevertRequestTopic => 'auto-submit-revert-queue';
String get pubsubRevertRequestSubscription => 'auto-submit-revert-queue-sub';

/// Retry options for timing related retryable code.
static const RetryOptions mergeRetryOptions = RetryOptions(
Expand Down
3 changes: 2 additions & 1 deletion auto_submit/test/requests/check_pull_request_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void main() {
late List<QueryOptions> expectedOptions;
late QueryOptions flutterOption;
late QueryOptions cocoonOption;
const String testSubscription = 'test-sub';
const String testTopic = 'test-topic';
const String rollorAuthor = "engine-flutter-autoroll";
const String labelName = "warning: land on red to fix tree breakage";
Expand Down Expand Up @@ -780,7 +781,7 @@ void main() {
prNumber: 0,
lastCommitHash: oid,
);
final List<pub.ReceivedMessage> messages = await checkPullRequest.pullMessages();
final List<pub.ReceivedMessage> messages = await checkPullRequest.pullMessages(testSubscription, 5, 5);
expect(messages.length, 3);
});
});
Expand Down

0 comments on commit f1216a8

Please sign in to comment.