-
Notifications
You must be signed in to change notification settings - Fork 10
/
DBContentsProvider.py
95 lines (68 loc) · 3.29 KB
/
DBContentsProvider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from joern.all import JoernSteps
class DBContentsProvider:
def __init__(self):
self._initDatabaseConnection()
def _initDatabaseConnection(self):
self.j = JoernSteps()
self.j.connectToDatabase()
self.j.addStepsDir('steps/')
"""
Generate contents for a given selector, overwriting
the contents currently held in cndToQueries memory by the server.
"""
def generate(self, selector):
query = """generateTaintLearnStructures(%s.id.toList())
_()""" % (selector)
for unused in self.j.runGremlinQuery(query): pass
def generateChecksForInvocations(self, invocs):
query = """generateChecksForInvocations(%s.toList())
_()""" % (invocs)
for unused in self.j.runGremlinQuery(query): pass
# Source Analysis
def getSourceAPISymbols(self):
query = """_().transform{ getSourceAPISymbols() }.scatter() """
return [x for x in self.j.runGremlinQuery(query)]
def getAllDefStmtsPerArg(self):
query = """_().transform{ getAllDefStmtsPerArg() }.scatter() """
return [x for x in self.j.runGremlinQuery(query)]
# Condition Analysis
def getAllChecksPerArg(self):
query = """_().transform{ getAllChecksPerArg() }.scatter() """
return [x for x in self.j.runGremlinQuery(query)]
def getAllConditions(self):
query = """_().transform{ getAllConditions() }.scatter() """
return [x for x in self.j.runGremlinQuery(query)]
def getAllConditionsCode(self):
query = """_().transform{ getAllConditionsCode() }.scatter() """
return [x for x in self.j.runGremlinQuery(query)]
def getInvocationCallSiteIds(self):
query = """_().transform{ getInvocationCallSites() }.scatter() """
return [x for x in self.j.runGremlinQuery(query)]
def getSubConditions(self, nodeId):
query = """_().transform{ subConditions(%s) }.scatter() """ % (nodeId)
return [x for x in self.j.runGremlinQuery(query)]
def getAllCndFeatureVectors(self, invocs = [], argNum = None):
if not invocs:
if argNum != None:
query = """_().transform{ getAllCndFeatureVectors(%d) }.scatter() """ % (argNum)
else:
query = """_().transform{ getAllCndFeatureVectors() }.scatter() """
else:
if argNum != None:
query = """_().transform{ getCndFeatureVectorsForInvocs(%s, %d) }.scatter() """ % (invocs, argNum)
else:
query = """_().transform{ getCndFeatureVectorsForInvocs(%s) }.scatter() """ % (invocs)
return [x for x in self.j.runGremlinQuery(query)]
def getAllASTNodeLabels(self):
query = """_().transform{ getAllASTNodeLabels() }.scatter() """
return [x for x in self.j.runGremlinQuery(query)]
# Choosing sinks
def getControlledSinks(self, nodeId):
query = """_().transform{ getControlledSinks(%s) }.scatter() """ % (nodeId)
return [x for x in self.j.runGremlinQuery(query)]
if __name__ == '__main__':
gen = DBContentsProvider()
# gen.generate('g.v(12798)._()')
gen.generate('getCallsTo("TIFFFetchData")._()')
for x in gen.getSourceAPISymbols():
print x