Skip to content

Commit

Permalink
ThreadMonitor : Add class for monitoring threads used for processes
Browse files Browse the repository at this point in the history
  • Loading branch information
johnhaddon committed Aug 16, 2023
1 parent 04dfe1e commit f076d9d
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 0 deletions.
5 changes: 5 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ Fixes

- Viewer : Fixed crash when visualising lights with a light filter intended for a different renderer.

API
---

- ThreadMonitor : Added new class for tracking the threads used to perform processes.

Documentation
-------------

Expand Down
108 changes: 108 additions & 0 deletions include/Gaffer/ThreadMonitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above
// copyright notice, this list of conditions and the following
// disclaimer.
//
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials provided with
// the distribution.
//
// * Neither the name of John Haddon nor the names of
// any other contributors to this software may be used to endorse or
// promote products derived from this software without specific prior
// written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////

#pragma once

#include "Gaffer/Monitor.h"

#include "tbb/enumerable_thread_specific.h"

#include <unordered_map>

namespace Gaffer
{

IE_CORE_FORWARDDECLARE( Plug )

/// A monitor which collects information about which threads
/// initiated processes on each plug.
class GAFFER_API ThreadMonitor : public Monitor
{

public :

ThreadMonitor( const std::vector<IECore::InternedString> &processMask = { "computeNode:compute" } );
~ThreadMonitor() override;

IE_CORE_DECLAREMEMBERPTR( ThreadMonitor )

/// Numeric identifier for a thread. Using our own identifier rather
/// than `std::thread::id` so that we can bind it to Python (and assign
/// human-readable contiguous values).
using ThreadId = int;
/// Returns the `ThreadId` for the calling thread.
static ThreadId thisThreadId();
/// Maps from `ThreadId` to the number of times a process has been
/// invoked on that thread.
using ProcessesPerThread = std::unordered_map<ThreadId, size_t>;
/// Stores per-thread process counts per-plug.
using PlugMap = std::unordered_map<ConstPlugPtr, ProcessesPerThread>;

/// Query functions. These are not thread-safe, and must be called
/// only when the Monitor is not active (as defined by `Monitor::Scope`).
const PlugMap &allStatistics() const;
const ProcessesPerThread &plugStatistics( const Plug *plug ) const;
const ProcessesPerThread &combinedStatistics() const;

protected :

void processStarted( const Process *process ) override;
void processFinished( const Process *process ) override;

private :

const std::vector<IECore::InternedString> m_processMask;

// We collect statistics into a per-thread data structure to avoid contention.
struct ThreadData
{
ThreadData();
using ProcessesPerPlug = std::unordered_map<ConstPlugPtr, size_t>;
ThreadId id;
ProcessesPerPlug processesPerPlug;
};
mutable tbb::enumerable_thread_specific<ThreadData> m_threadData;

// Then when we want to query it, we collate it into `m_statistics`.
void collate() const;
mutable PlugMap m_statistics;
mutable ProcessesPerThread m_combinedStatistics;

};

IE_CORE_DECLAREPTR( ThreadMonitor )

} // namespace Gaffer
144 changes: 144 additions & 0 deletions python/GafferTest/ThreadMonitorTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
##########################################################################
#
# Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with
# the distribution.
#
# * Neither the name of John Haddon nor the names of
# any other contributors to this software may be used to endorse or
# promote products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
##########################################################################

import threading
import unittest

import IECore

import Gaffer
import GafferTest

class ThreadMonitorTest( GafferTest.TestCase ) :

def testConstruction( self ) :

monitor = Gaffer.ThreadMonitor()
self.assertEqual( monitor.allStatistics(), {} )
self.assertEqual( monitor.plugStatistics( Gaffer.IntPlug() ), {} )
self.assertEqual( monitor.combinedStatistics(), {} )

def testThisThreadId( self ) :

id = Gaffer.ThreadMonitor.thisThreadId()
self.assertEqual( id, Gaffer.ThreadMonitor.thisThreadId() )

ids = { id }
lock = threading.Lock()

def storeId() :
id = Gaffer.ThreadMonitor.thisThreadId()
self.assertEqual( id, Gaffer.ThreadMonitor.thisThreadId() )
with lock :
ids.add( id )

threads = []
for i in range( 0, 5 ) :
thread = threading.Thread( target = storeId )
threads.append( thread )
thread.start()

for thread in threads :
thread.join()

self.assertEqual( len( ids ), 6 )

def testMonitoring( self ) :

random = Gaffer.Random()
monitor = Gaffer.ThreadMonitor()

with monitor :
random["outFloat"].getValue()

self.assertEqual(
monitor.allStatistics(),
{
random["outFloat"] : {
monitor.thisThreadId() : 1
}
}
)
self.assertEqual(
monitor.plugStatistics( random["outFloat"] ),
{ monitor.thisThreadId() : 1 }
)
self.assertEqual(
monitor.combinedStatistics(),
{ monitor.thisThreadId() : 1 }
)

random["seedVariable"].setValue( "test" )
with monitor :
GafferTest.parallelGetValue( random["outFloat"], 100000, "test" )

s = monitor.plugStatistics( random["outFloat"] )
self.assertEqual( len( s ), IECore.tbb_global_control.active_value( IECore.tbb_global_control.parameter.max_allowed_parallelism ) )
self.assertEqual( sum( s.values() ), 100001 )

self.assertEqual( monitor.allStatistics(), { random["outFloat"] : s } )
self.assertEqual( monitor.combinedStatistics(), s )

def testProcessMask( self ) :

for processType in [ "computeNode:hash", "computeNode:value" ] :

with self.subTest( processType = processType ) :

Gaffer.ValuePlug.clearCache()
Gaffer.ValuePlug.clearHashCache()

random = Gaffer.Random()
threadMonitor = Gaffer.ThreadMonitor()
performanceMonitor = Gaffer.PerformanceMonitor()

with threadMonitor, performanceMonitor :
GafferTest.parallelGetValue( random["outFloat"], 5, "test" )

self.assertEqual(
performanceMonitor.plugStatistics( random["outFloat"] ).computeCount, 1
)
self.assertEqual(
performanceMonitor.plugStatistics( random["outFloat"] ).hashCount, 5
)

self.assertEqual(
sum( threadMonitor.plugStatistics( random["outFloat"] ).values() ),
5 if processType == "computeNode:compute" else 1
)

if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions python/GafferTest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def inCI( platforms = set() ) :
from .HiddenFilePathFilterTest import HiddenFilePathFilterTest
from .ContextVariableTweaksTest import ContextVariableTweaksTest
from .OptionalValuePlugTest import OptionalValuePlugTest
from .ThreadMonitorTest import ThreadMonitorTest

from .IECorePreviewTest import *

Expand Down
121 changes: 121 additions & 0 deletions src/Gaffer/ThreadMonitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above
// copyright notice, this list of conditions and the following
// disclaimer.
//
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials provided with
// the distribution.
//
// * Neither the name of John Haddon nor the names of
// any other contributors to this software may be used to endorse or
// promote products derived from this software without specific prior
// written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////

#include "Gaffer/ThreadMonitor.h"

#include "Gaffer/Plug.h"
#include "Gaffer/Process.h"

using namespace Gaffer;

namespace
{

static std::atomic<int> g_threadIdCounter = 0;
ThreadMonitor::ProcessesPerThread g_emptyStatistics;

} // namespace

ThreadMonitor::ThreadData::ThreadData()
: id( thisThreadId() )
{
}

ThreadMonitor::ThreadMonitor( const std::vector<IECore::InternedString> &processMask )
: m_processMask( processMask )
{
}

ThreadMonitor::~ThreadMonitor()
{
}

ThreadMonitor::ThreadId ThreadMonitor::thisThreadId()
{
thread_local int id = g_threadIdCounter++;
return id;
}

const ThreadMonitor::PlugMap &ThreadMonitor::allStatistics() const
{
collate();
return m_statistics;
}

const ThreadMonitor::ProcessesPerThread &ThreadMonitor::plugStatistics( const Plug *plug ) const
{
collate();
auto it = m_statistics.find( plug );
if( it == m_statistics.end() )
{
return g_emptyStatistics;
}
return it->second;
}

const ThreadMonitor::ProcessesPerThread &ThreadMonitor::combinedStatistics() const
{
collate();
return m_combinedStatistics;
}

void ThreadMonitor::processStarted( const Process *process )
{
if( std::find( m_processMask.begin(), m_processMask.end(), process->type() ) == m_processMask.end() )
{
return;
}

ThreadData &threadData = m_threadData.local();
threadData.processesPerPlug[process->plug()]++;
}

void ThreadMonitor::processFinished( const Process *process )
{
}

void ThreadMonitor::collate() const
{
for( auto &threadData : m_threadData )
{
for( const auto &[plug, count] : threadData.processesPerPlug )
{
m_statistics[plug][threadData.id] += count;
m_combinedStatistics[threadData.id] += count;
}
threadData.processesPerPlug.clear();
}
}
Loading

0 comments on commit f076d9d

Please sign in to comment.