Skip to content

Commit

Permalink
NMS-15776: Parallelize DNS lookups during audit phase
Browse files Browse the repository at this point in the history
  • Loading branch information
christianpape committed Jun 23, 2023
1 parent 021d516 commit 6505114
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,35 @@

package org.opennms.netmgt.provision.service;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.opennms.netmgt.provision.persist.AbstractRequisitionVisitor;
import org.opennms.netmgt.provision.persist.OnmsAssetRequisition;
import org.opennms.netmgt.provision.persist.OnmsInterfaceMetaDataRequisition;
import org.opennms.netmgt.provision.persist.OnmsIpInterfaceRequisition;
import org.opennms.netmgt.provision.persist.OnmsNodeMetaDataRequisition;
import org.opennms.netmgt.provision.persist.OnmsMonitoredServiceRequisition;
import org.opennms.netmgt.provision.persist.OnmsNodeCategoryRequisition;
import org.opennms.netmgt.provision.persist.OnmsNodeMetaDataRequisition;
import org.opennms.netmgt.provision.persist.OnmsNodeRequisition;
import org.opennms.netmgt.provision.persist.OnmsServiceMetaDataRequisition;
import org.opennms.netmgt.provision.persist.requisition.Requisition;
import org.opennms.netmgt.provision.service.operations.ImportOperationsManager;
import org.opennms.netmgt.provision.service.operations.SaveOrUpdateOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequisitionAccountant extends AbstractRequisitionVisitor {
private final ImportOperationsManager m_opsMgr;
private static final Logger LOG = LoggerFactory.getLogger(RequisitionAccountant.class);

private final ImportOperationsManager m_opsMgr;
private SaveOrUpdateOperation m_currentOp;
private String monitorKey;


private final Set<CompletableFuture<Void>> m_dnsLookups = Collections.synchronizedSet(new HashSet<>());

/**
* <p>Constructor for RequisitionAccountant.</p>
*
Expand All @@ -70,8 +82,8 @@ public void completeNode(OnmsNodeRequisition nodeReq) {
/** {@inheritDoc} */
@Override
public void visitInterface(OnmsIpInterfaceRequisition ifaceReq) {
m_currentOp.foundInterface(ifaceReq.getIpAddr(), ifaceReq.getDescr(), ifaceReq.getSnmpPrimary(), ifaceReq.getManaged(), ifaceReq.getStatus());

m_currentOp.foundInterface(ifaceReq.getIpAddr(), ifaceReq.getDescr(), ifaceReq.getSnmpPrimary(), ifaceReq.getManaged(), ifaceReq.getStatus(), m_dnsLookups);
LOG.debug("{} DNS lookups scheduled, {} DNS lookups completed", dnsLookupsTotal(), dnsLookupsCompleted());
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -106,4 +118,23 @@ public void visitInterfaceMetaData(OnmsInterfaceMetaDataRequisition metaDataReq)
public void visitServiceMetaData(OnmsServiceMetaDataRequisition metaDataReq) {
m_currentOp.foundServiceMetaData(metaDataReq.getContext(), metaDataReq.getKey(), metaDataReq.getValue());
}

int dnsLookupsCompleted() {
return (int) m_dnsLookups.stream().filter(f -> f.isDone()).count();
}

int dnsLookupsPending() {
return dnsLookupsTotal() - dnsLookupsCompleted();
}

int dnsLookupsTotal() {
return m_dnsLookups.size();
}

@Override
public void completeModelImport(Requisition req) {
LOG.debug("Waiting for {} scheduled DNS lookups, {} DNS lookups pending", dnsLookupsTotal(), dnsLookupsPending());
CompletableFuture.allOf(m_dnsLookups.toArray(CompletableFuture[]::new)).join();
LOG.debug("All {} scheduled DNS lookups completed", dnsLookupsTotal());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
package org.opennms.netmgt.provision.service.operations;

import java.net.InetAddress;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.opennms.core.utils.InetAddressUtils;
import org.opennms.netmgt.dao.api.MonitoringLocationDao;
import org.opennms.netmgt.model.OnmsCategory;
import org.opennms.netmgt.model.OnmsIpInterface;
Expand Down Expand Up @@ -99,8 +100,9 @@ public ScanManager getScanManager() {
* @param primaryType a {@link InterfaceSnmpPrimaryType} object.
* @param managed a boolean.
* @param status a int.
* @param dnsLookups the list of DNS lookup futures.
*/
public void foundInterface(InetAddress addr, Object descr, final PrimaryType primaryType, boolean managed, int status) {
public void foundInterface(InetAddress addr, Object descr, final PrimaryType primaryType, boolean managed, int status, final Set<CompletableFuture<Void>> dnsLookups) {

if (addr == null) {
LOG.error("Found interface on node {} with an empty/invalid ipaddr! Ignoring!", m_node.getLabel());
Expand All @@ -111,10 +113,6 @@ public void foundInterface(InetAddress addr, Object descr, final PrimaryType pri
m_currentInterface.setIsManaged(status == 3 ? "U" : "M");
m_currentInterface.setIsSnmpPrimary(primaryType);

if (addr != null && System.getProperty("org.opennms.provisiond.reverseResolveRequisitionIpInterfaceHostnames", "true").equalsIgnoreCase("true")) {
m_currentInterface.setIpHostName(getProvisionService().getHostnameResolver().getHostname(addr, m_node.getLocation().getLocationName()));
}

if (PrimaryType.PRIMARY.equals(primaryType)) {
if (addr != null) {
m_scanManager = new ScanManager(getProvisionService().getLocationAwareSnmpClient(), addr);
Expand All @@ -124,6 +122,10 @@ public void foundInterface(InetAddress addr, Object descr, final PrimaryType pri
//FIXME: verify this doesn't conflict with constructor. The constructor already adds this
//interface to the node.
m_node.addIpInterface(m_currentInterface);

if (System.getProperty("org.opennms.provisiond.reverseResolveRequisitionIpInterfaceHostnames", "true").equalsIgnoreCase("true")) {
dnsLookups.add(CompletableFuture.supplyAsync(() -> getProvisionService().getHostnameResolver().getHostname(addr, m_node.getLocation().getLocationName())).thenAccept(s -> m_node.getInterfaceWithAddress(addr).setIpHostName(s)));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*******************************************************************************
* This file is part of OpenNMS(R).
*
* Copyright (C) 2023 The OpenNMS Group, Inc.
* OpenNMS(R) is Copyright (C) 1999-2023 The OpenNMS Group, Inc.
*
* OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
*
* OpenNMS(R) is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* OpenNMS(R) is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with OpenNMS(R). If not, see:
* http://www.gnu.org/licenses/
*
* For more information contact:
* OpenNMS(R) Licensing <[email protected]>
* http://www.opennms.org/
* http://www.opennms.com/
*******************************************************************************/

package org.opennms.netmgt.provision.service;

import static org.awaitility.Awaitility.await;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.opennms.core.utils.InetAddressUtils;
import org.opennms.netmgt.model.PrimaryType;
import org.opennms.netmgt.provision.persist.requisition.Requisition;
import org.opennms.netmgt.provision.persist.requisition.RequisitionInterface;
import org.opennms.netmgt.provision.persist.requisition.RequisitionNode;
import org.opennms.netmgt.provision.service.operations.ImportOperationsManager;
import org.opennms.netmgt.provision.service.operations.InsertOperation;
import org.opennms.netmgt.snmp.proxy.LocationAwareSnmpClient;

import com.google.common.collect.Lists;

public class RequisitionAccountantTest {
private boolean blocked = false;

private RequisitionAccountant createRequisitionAccountant() {
final ProvisionService provisionService = Mockito.mock(ProvisionService.class);
final LocationAwareSnmpClient locationAwareSnmpClient = Mockito.mock(LocationAwareSnmpClient.class);
Mockito.when(provisionService.getLocationAwareSnmpClient()).thenReturn(locationAwareSnmpClient);
Mockito.when(provisionService.getHostnameResolver()).thenReturn((addr, location) -> {
while (blocked) {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return location + InetAddressUtils.str(addr);
});

final ImportOperationsManager importOperationsManager = Mockito.mock(ImportOperationsManager.class);
Mockito.when(importOperationsManager.foundNode("node1", "node1", "MINION", "", "", "monitorKey")).thenReturn(new InsertOperation("foreignSource", "node1", "node1", "MINION", "","", provisionService, "monitorKey"));
Mockito.when(importOperationsManager.foundNode("node2", "node2", "MINION", "", "", "monitorKey")).thenReturn(new InsertOperation("foreignSource", "node2", "node1", "MINION", "","", provisionService, "monitorKey"));
Mockito.when(importOperationsManager.foundNode("node3", "node3", "MINION", "", "", "monitorKey")).thenReturn(new InsertOperation("foreignSource", "node3", "node1", "MINION", "","", provisionService, "monitorKey"));

return new RequisitionAccountant(importOperationsManager, "monitorKey");
}

private RequisitionNode createNode(final String foreignId, final String ... ipAddresses) {
final RequisitionNode requisitionNode = new RequisitionNode();
requisitionNode.setLocation("MINION");
requisitionNode.setNodeLabel(foreignId);
requisitionNode.setForeignId(foreignId);
requisitionNode.setBuilding("");
requisitionNode.setCity("");

final List<RequisitionInterface> requisitionInterfaces = new ArrayList<>();

for(final String ipAddress : ipAddresses) {
final RequisitionInterface requisitionInterface = new RequisitionInterface();
requisitionInterface.setIpAddr(ipAddress);
requisitionInterface.setManaged(true);
requisitionInterface.setStatus(1);
requisitionInterface.setSnmpPrimary(requisitionInterfaces.size() == 0 ? PrimaryType.PRIMARY : PrimaryType.SECONDARY);
requisitionInterfaces.add(requisitionInterface);
}

requisitionNode.setInterfaces(requisitionInterfaces);

return requisitionNode;
}

private Requisition createRequisition() {
final Requisition requisition = new Requisition();
requisition.setForeignSource("foreignSource");
requisition.setDate(new Date());

requisition.setNodes(Lists.newArrayList(
createNode("node1", "10.32.29.11"),
createNode("node2", "10.32.30.22", "10.32.29.22"),
createNode("node3", "10.32.29.33", "10.32.30.33", "10.32.28.33")
));

return requisition;
}

@Test
public void testRequisitionAccountant() {
blocked = true;

// create requisition with three nodes, first with one, second with two, third with three interfaces
final Requisition requisition = createRequisition();

// create visitor
final RequisitionAccountant requisitionAccountant = createRequisitionAccountant();

// no DNS lookups scheduled since visit() wasn't called
Assert.assertEquals(0, requisitionAccountant.dnsLookupsTotal());
Assert.assertEquals(0, requisitionAccountant.dnsLookupsPending());
Assert.assertEquals(0, requisitionAccountant.dnsLookupsCompleted());

final CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
requisition.visit(requisitionAccountant);
}
});

// wait for lookups to be scheduled
await().atMost(1, TimeUnit.MINUTES).until(() -> {
return requisitionAccountant.dnsLookupsTotal() == 6;
});

// check for all six lookups
Assert.assertEquals(6, requisitionAccountant.dnsLookupsTotal());
Assert.assertEquals(6, requisitionAccountant.dnsLookupsPending());
Assert.assertEquals(0, requisitionAccountant.dnsLookupsCompleted());

// unblock
blocked = false;

// wait till lookups are completed
await().atMost(1, TimeUnit.MINUTES).until(() -> requisitionAccountant.dnsLookupsPending() == 0);

Assert.assertEquals(6, requisitionAccountant.dnsLookupsTotal());
Assert.assertEquals(0, requisitionAccountant.dnsLookupsPending());
Assert.assertEquals(6, requisitionAccountant.dnsLookupsCompleted());
}
}
Loading

0 comments on commit 6505114

Please sign in to comment.