Python multiprocess with pool workers - memory use optimization
I have a fuzzy string matching script that looks for some 30K needles in a
haystack of 4 million company names. While the script works fine, my
attempts at speeding up things via parallel processing on an AWS h1.xlarge
failed as I'm running out of memory.
Rather than trying to get more memory as explained in response to my
previous question, I'd like to find out how to optimize the workflow - I'm
fairly new to this so there should be plenty of room. Btw, I've already
experimented with queues (also worked but ran into the same MemoryError,
plus looked through a bunch of very helpful SO contributions, but not
quite there yet.
Here's what seems most relevant of the code. I hope it sufficiently
clarifies the logic - happy to provide more info as needed:
def getHayStack():
## loads a few million company names into id: name dict
return hayCompanies
def getNeedles(*args):
## loads subset of 30K companies into id: name dict (for allocation to
workers)
return needleCompanies
def findNeedle(needle, haystack):
""" Identify best match and return results with score """
results = {}
for hayID, hayCompany in haystack.iteritems():
if not isnull(haystack[hayID]):
results[hayID] = levi.setratio(needle.split(' '),
hayCompany.split(' '))
scores = list(results.values())
resultIDs = list(results.keys())
needleID = resultIDs[scores.index(max(scores))]
return [needleID, haystack[needleID], max(scores)]
def runMatch(args):
""" Execute findNeedle and process results for poolWorker batch"""
batch, first = args
last = first + batch
hayCompanies = getHayStack()
needleCompanies = getTargets(first, last)
needles = defaultdict(list)
current = first
for needleID, needleCompany in needleCompanies.iteritems():
current += 1
needles[targetID] = findNeedle(needleCompany, hayCompanies)
## Then store results
if __name__ == '__main__':
pool = Pool(processes = numProcesses)
totalTargets = len(getTargets('all'))
targetsPerBatch = totalTargets / numProcesses
pool.map_async(runMatch,
itertools.izip(itertools.repeat(targetsPerBatch),
xrange(0,
totalTargets,
targetsPerBatch))).get(99999999)
pool.close()
pool.join()
So I guess the questions are: How can I avoid loading the haystack for all
workers - e.g. by sharing the data or taking a different approach like
dividing the much larger haystack across workers rather than the needles?
How can I otherwise improve memory usage by avoiding or eliminating
clutter?
No comments:
Post a Comment