Skip to content

Commit

Permalink
Fix argo parameter validation keys.
Browse files Browse the repository at this point in the history
Fix to correctly serialize logger scope values.

Signed-off-by: Victor Chang <[email protected]>
  • Loading branch information
mocsharp committed Sep 1, 2023
1 parent 3cababe commit 01998a2
Show file tree
Hide file tree
Showing 21 changed files with 189 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,25 @@
* limitations under the License.
*/

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo.StaticValues
using System.Collections.Generic;

namespace Monai.Deploy.WorkflowManager.Common.Configuration
{
internal static class Keys
public static class ArgoParameters
{
public struct ResourcesKey
{
public string TaskKey { get; set; }
public string ArgoKey { get; set; }
}
public static class ResourcesKeys
{
public static readonly ResourcesKey GpuLimit = new() { TaskKey = GpuRequired, ArgoKey = "nvidia.com/gpu" };

public static readonly ResourcesKey MemoryLimit = new() { TaskKey = Memory, ArgoKey = "memory" };

public static readonly ResourcesKey CpuLimit = new() { TaskKey = Cpu, ArgoKey = "cpu" };
}
/// <summary>
/// Key for the namespace where the Argo workflows are stored and executed.
/// </summary>
Expand Down Expand Up @@ -76,26 +91,66 @@ internal static class Keys
/// <summary>
/// Key for resource limitations
/// </summary>
public static readonly string ArgoResource = "resources";
public static readonly string Resources = "resources";

/// <summary>
/// Key for resource limitations
/// </summary>
public static readonly string ArgoParameters = "parameters";
public static readonly string Parameters = "parameters";

/// <summary>
/// Key for priority classnames on task plugin arguments side
/// </summary>
public static readonly string TaskPriorityClassName = "priority";

/// <summary>
/// Key for the CPU.
/// </summary>
public static readonly string Cpu = "cpu";

/// <summary>
/// Key for the memory.
/// </summary>
public static readonly string Memory = "memory";

/// <summary>
/// Key for the GPU.
/// </summary>
public static readonly string GpuRequired = "gpu_required";

/// <summary>
/// Required arguments to run the Argo workflow.
/// </summary>
public static readonly IReadOnlyList<string> RequiredParameters =
new List<string> {
BaseUrl,
WorkflowTemplateName
};

/// <summary>
/// Required arguments to run the Argo workflow.
/// </summary>
public static readonly IReadOnlyList<string> VaildParameters =
new List<string> {
Namespace,
BaseUrl,
AllowInsecureseUrl,
WorkflowTemplateName,
TimeoutSeconds,
ArgoApiToken,
MessagingEndpoint,
MessagingUsername,
MessagingPassword,
MessagingExchange,
MessagingVhost,
Resources,
Parameters,
TaskPriorityClassName,
Cpu,
Memory,
GpuRequired,
};

/// <summary>
/// Required settings to run the Argo workflow.
/// </summary>
Expand Down
40 changes: 40 additions & 0 deletions src/Common/Miscellaneous/LoggingDataDictionary.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2021-2023 MONAI Consortium
* Copyright 2021 NVIDIA Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Globalization;
using System.Runtime.Serialization;

namespace Monai.Deploy.WorkflowManager.Common.Miscellaneous
{
[Serializable]
public class LoggingDataDictionary<TKey, TValue> : Dictionary<TKey, TValue> where TKey : notnull
{
public LoggingDataDictionary()
{
}

protected LoggingDataDictionary(SerializationInfo info, StreamingContext context) : base(info, context)
{
}

public override string ToString()
{
var pairs = this.Select(x => string.Format(CultureInfo.InvariantCulture, "{0}={1}", x.Key, x.Value));
return string.Join(", ", pairs);
}
}
}
33 changes: 0 additions & 33 deletions src/Common/Miscellaneous/ValidationConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,6 @@ public static class ValidationConstants
/// </summary>
public static readonly string Notifications = "notifications";

/// <summary>
/// Key for the CPU.
/// </summary>
public static readonly string Cpu = "cpu";

/// <summary>
/// Key for the memory.
/// </summary>
public static readonly string Memory = "memory";

/// <summary>
/// Key for the GPU.
/// </summary>
public static readonly string GpuRequired = "gpu_required";

/// <summary>
/// Key for recipient emails.
/// </summary>
Expand Down Expand Up @@ -107,24 +92,6 @@ public enum NotificationValues
Mode
};

/// <summary>
/// Key for the endpoint where the Argo server is running.
/// </summary>
public static readonly string BaseUrl = "server_url";

/// <summary>
/// Key for the name of the main 'WorkflowTemplate' stored on the targeted Argo server.
/// </summary>
public static readonly string WorkflowTemplateName = "workflow_template_name";

/// <summary>
/// Required arguments to run the Argo task args.
/// </summary>
public static readonly IReadOnlyList<string> ArgoRequiredParameters =
new List<string> {
BaseUrl,
WorkflowTemplateName
};


/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private void ValidateEventAndInit()

public override async Task<ExecutionStatus> ExecuteTask(CancellationToken cancellationToken = default)
{
using var loggingScope = _logger.BeginScope(new Dictionary<string, object>
using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
{
["correlationId"] = Event.CorrelationId,
["workflowInstanceId"] = Event.WorkflowInstanceId,
Expand Down
73 changes: 36 additions & 37 deletions src/TaskManager/Plug-ins/Argo/ArgoPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
using Monai.Deploy.WorkflowManager.TaskManager.API.Extensions;
using Monai.Deploy.WorkflowManager.TaskManager.API.Models;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Logging;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.StaticValues;
using Newtonsoft.Json;

[assembly: PlugIn()]
Expand Down Expand Up @@ -84,7 +83,7 @@ public ArgoPlugin(

private void Initialize()
{
using var loggingScope = _logger.BeginScope(new Dictionary<string, object>
using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
{
["correlationId"] = Event.CorrelationId,
["workflowInstanceId"] = Event.WorkflowInstanceId,
Expand All @@ -93,54 +92,54 @@ private void Initialize()
["payloadId"] = Event.PayloadId
});

if (Event.TaskPluginArguments.ContainsKey(Keys.TimeoutSeconds) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[Keys.TimeoutSeconds]) &&
int.TryParse(Event.TaskPluginArguments[Keys.TimeoutSeconds], out var result))
if (Event.TaskPluginArguments.ContainsKey(ArgoParameters.TimeoutSeconds) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[ArgoParameters.TimeoutSeconds]) &&
int.TryParse(Event.TaskPluginArguments[ArgoParameters.TimeoutSeconds], out var result))
{
_activeDeadlineSeconds = result;
}

if (Event.TaskPluginArguments.ContainsKey(Keys.ArgoApiToken) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[Keys.ArgoApiToken]))
if (Event.TaskPluginArguments.ContainsKey(ArgoParameters.ArgoApiToken) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[ArgoParameters.ArgoApiToken]))
{
_apiToken = Event.TaskPluginArguments[Keys.ArgoApiToken];
_apiToken = Event.TaskPluginArguments[ArgoParameters.ArgoApiToken];
}

bool updateEvent = false;

if (Event.TaskPluginArguments.ContainsKey(Keys.Namespace) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[Keys.Namespace]))
if (Event.TaskPluginArguments.ContainsKey(ArgoParameters.Namespace) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[ArgoParameters.Namespace]))
{
_namespace = Event.TaskPluginArguments[Keys.Namespace];
_namespace = Event.TaskPluginArguments[ArgoParameters.Namespace];
}
else
{
_namespace = Strings.DefaultNamespace;
Event.TaskPluginArguments.Add(Keys.Namespace, _namespace);
Event.TaskPluginArguments.Add(ArgoParameters.Namespace, _namespace);
updateEvent = true;
}

if (Event.TaskPluginArguments.ContainsKey(Keys.AllowInsecureseUrl) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[Keys.AllowInsecureseUrl]))
if (Event.TaskPluginArguments.ContainsKey(ArgoParameters.AllowInsecureseUrl) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[ArgoParameters.AllowInsecureseUrl]))
{
_allowInsecure = string.Compare("true", Event.TaskPluginArguments[Keys.AllowInsecureseUrl], true) == 0;
_allowInsecure = string.Compare("true", Event.TaskPluginArguments[ArgoParameters.AllowInsecureseUrl], true) == 0;
}
else
{
_allowInsecure = true;
Event.TaskPluginArguments.Add(Keys.AllowInsecureseUrl, "true");
Event.TaskPluginArguments.Add(ArgoParameters.AllowInsecureseUrl, "true");
updateEvent = true;
}

if (Event.TaskPluginArguments.ContainsKey(Keys.BaseUrl) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[Keys.BaseUrl]))
if (Event.TaskPluginArguments.ContainsKey(ArgoParameters.BaseUrl) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[ArgoParameters.BaseUrl]))
{
_baseUrl = Event.TaskPluginArguments[Keys.BaseUrl];
_baseUrl = Event.TaskPluginArguments[ArgoParameters.BaseUrl];
}
else
{
_baseUrl = _options.Value.TaskManager.ArgoPluginArguments.ServerUrl;
Event.TaskPluginArguments.Add(Keys.BaseUrl, _baseUrl);
Event.TaskPluginArguments.Add(ArgoParameters.BaseUrl, _baseUrl);
updateEvent = true;
}

Expand All @@ -161,10 +160,10 @@ private void ValidateEvent()
{
if (Event.TaskPluginArguments is null || Event.TaskPluginArguments.Count == 0)
{
throw new InvalidTaskException($"Required parameters to execute Argo workflow are missing: {string.Join(',', Keys.RequiredParameters)}");
throw new InvalidTaskException($"Required parameters to execute Argo workflow are missing: {string.Join(',', ArgoParameters.RequiredParameters)}");
}

foreach (var key in Keys.RequiredParameters)
foreach (var key in ArgoParameters.RequiredParameters)
{
if (!Event.TaskPluginArguments.ContainsKey(key) &&
string.IsNullOrWhiteSpace(Event.TaskPluginArguments[key]))
Expand All @@ -173,24 +172,24 @@ private void ValidateEvent()
}
}

foreach (var key in Keys.RequiredSettings)
foreach (var key in ArgoParameters.RequiredSettings)
{
if (!_options.Value.Messaging.PublisherSettings.ContainsKey(key))
{
throw new ConfigurationException($"Required message publisher setting to execute Argo workflow is missing: {key}");
}
}

if (Event.TaskPluginArguments.ContainsKey(Keys.BaseUrl) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[Keys.BaseUrl]) && !Uri.IsWellFormedUriString(Event.TaskPluginArguments[Keys.BaseUrl], UriKind.Absolute))
if (Event.TaskPluginArguments.ContainsKey(ArgoParameters.BaseUrl) &&
!string.IsNullOrWhiteSpace(Event.TaskPluginArguments[ArgoParameters.BaseUrl]) && !Uri.IsWellFormedUriString(Event.TaskPluginArguments[ArgoParameters.BaseUrl], UriKind.Absolute))
{
throw new InvalidTaskException($"The value '{Event.TaskPluginArguments[Keys.BaseUrl]}' provided for '{Keys.BaseUrl}' is not a valid URI.");
throw new InvalidTaskException($"The value '{Event.TaskPluginArguments[ArgoParameters.BaseUrl]}' provided for '{ArgoParameters.BaseUrl}' is not a valid URI.");
}
}

public override async Task<ExecutionStatus> ExecuteTask(CancellationToken cancellationToken = default)
{
using var loggingScope = _logger.BeginScope(new Dictionary<string, object>
using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
{
["correlationId"] = Event.CorrelationId,
["workflowInstanceId"] = Event.WorkflowInstanceId,
Expand Down Expand Up @@ -386,7 +385,7 @@ private async Task<Workflow> BuildWorkflowWrapper(CancellationToken cancellation
Kind = Strings.KindWorkflow,
Metadata = new ObjectMeta()
{
GenerateName = $"md-{Event.TaskPluginArguments![Keys.WorkflowTemplateName]}-",
GenerateName = $"md-{Event.TaskPluginArguments![ArgoParameters.WorkflowTemplateName]}-",
Labels = new Dictionary<string, string>
{
{ Strings.TaskIdLabelSelectorName, Event.TaskId! },
Expand Down Expand Up @@ -417,7 +416,7 @@ private async Task<Workflow> BuildWorkflowWrapper(CancellationToken cancellation

_logger.ArgoWorkflowTemplateGenerated(workflow.Metadata.GenerateName);
var workflowJson = JsonConvert.SerializeObject(workflow, Formatting.Indented);
workflowJson = workflowJson.Replace(_options.Value.Messaging.PublisherSettings[Keys.MessagingPassword], "*****");
workflowJson = workflowJson.Replace(_options.Value.Messaging.PublisherSettings[ArgoParameters.MessagingPassword], "*****");

_logger.ArgoWorkflowTemplateJson(workflow.Metadata.GenerateName, workflowJson);

Expand All @@ -433,19 +432,19 @@ private async Task<Workflow> BuildWorkflowWrapper(CancellationToken cancellation
private void ProcessTaskPluginArguments(Workflow workflow)
{
Guard.Against.Null(workflow, nameof(workflow));
var priorityClassName = Event.GetTaskPluginArgumentsParameter(Keys.TaskPriorityClassName) ?? "standard";
var priorityClassName = Event.GetTaskPluginArgumentsParameter(ArgoParameters.TaskPriorityClassName) ?? "standard";

foreach (var template in workflow.Spec.Templates)
{
AddLimit(template, ResourcesKeys.CpuLimit);
AddLimit(template, ResourcesKeys.MemoryLimit);
AddLimit(template, ResourcesKeys.GpuLimit);
AddLimit(template, ArgoParameters.ResourcesKeys.CpuLimit);
AddLimit(template, ArgoParameters.ResourcesKeys.MemoryLimit);
AddLimit(template, ArgoParameters.ResourcesKeys.GpuLimit);
template.PriorityClassName = priorityClassName;
}
workflow.Spec.PodPriorityClassName = priorityClassName;
}

private void AddLimit(Template2 template, ResourcesKey key)
private void AddLimit(Template2 template, ArgoParameters.ResourcesKey key)
{
Guard.Against.Null(template, nameof(template));
Guard.Against.Null(key, nameof(key));
Expand All @@ -463,7 +462,7 @@ private void AddLimit(Template2 template, ResourcesKey key)
}

// Convert true / false value to 0 or 1 for number of GPU
if (key.TaskKey == ResourcesKeys.GpuLimit.TaskKey)
if (key.TaskKey == ArgoParameters.ResourcesKeys.GpuLimit.TaskKey)
{
value = bool.TryParse(value, out bool gpuRequired) && gpuRequired ? "1" : "0";
}
Expand All @@ -475,11 +474,11 @@ private async Task AddMainWorkflowTemplate(Workflow workflow, CancellationToken
{
Guard.Against.Null(workflow, nameof(workflow));

var workflowTemplate = await LoadWorkflowTemplate(Event.TaskPluginArguments![Keys.WorkflowTemplateName]).ConfigureAwait(false);
var workflowTemplate = await LoadWorkflowTemplate(Event.TaskPluginArguments![ArgoParameters.WorkflowTemplateName]).ConfigureAwait(false);

if (workflowTemplate is null)
{
throw new TemplateNotFoundException(Event.TaskPluginArguments![Keys.WorkflowTemplateName]);
throw new TemplateNotFoundException(Event.TaskPluginArguments![ArgoParameters.WorkflowTemplateName]);
}
var mainTemplateSteps = new Template2()
{
Expand Down
Loading

0 comments on commit 01998a2

Please sign in to comment.