Rewriting podgrabber, part 4 - podgrabber is now threaded...again

by Jeremy Jones

This is part 4 of an N part series on rewriting my podgrabber application. Here are links to part one, part two, and part three. In part 3, I outlined my strategy for synchronizing between mediaStores. This post will update that strategy slightly to show how I'm now handling threading.

For the curious, the code lives in a Bazaar repository at http://bzr.podgrabber.org/trunk/

The SyncManager now takes a taskManager in its constructor.


class SyncManager(object):
"""This is a concrete implementation of a syncronization manager which is
intended to be subclassed if necessary.

A SyncManager connects two mediaStores with filters and processing steps.
It should be able to copy files from the fromStore to the toStore, exclude
any files which were filtered out, and execute any processingSteps along
the way.
"""
def __init__(self, fromStore, toStore, copyFilters, deleteFilters, preProcessingSteps, postProcessingSteps, taskManager):
self.fromStore = fromStore
self.toStore = toStore
self.copyFilters = copyFilters
self.deleteFilters = deleteFilters
self.preProcessingSteps = preProcessingSteps
self.postProcessingSteps = postProcessingSteps
self.taskManager = taskManager
self._init()


And on copying a file, the SyncManager pushes the request to the task manager:


def syncCopy(self):
for mediaFile in self.getCopyList():
print "ADDING MEDIA FILE", mediaFile
logger.info("Copying file %s" % mediaFile)
self.taskManager.addCopyFile(mediaFile, self.toStore, self.preProcessingSteps, self.postProcessingSteps)



Here is the task manager code in its entirity:

from Queue import Queue
import thread
import threading
import time

import logging
logger = logging.getLogger("podgrabber.syncTaskManager")


class Shutdown(object):
pass

class CopyWorker(threading.Thread):
def __init__(self, q, fileDict):
self.q = q
self.fileDict = fileDict
threading.Thread.__init__(self)
def run(self):
#print "Running copy thread", self.getName()
logger.info("Running")
while 1:
logger.debug("Blocking while pulling items from queue")
mediaFile, mediaStore, preProc, postProc = self.q.get()
if type(mediaFile) == Shutdown:
#print "Break"
logger.info("Shutting down")
break
logger.debug("Retrieved items from queue")
for preProcessingStep in preProc:
mediaFile = preProcessingStep.process(mediaFile)
logger.debug("Retrieving file %s" % mediaFile)
mediaStore.addFile(mediaFile)
logger.debug("Done etrieving file %s" % mediaFile)
for postProcessingStep in postProc:
mediaFile = postProcessingStep.process(mediaFile)
self.fileDict[mediaFile][1] = time.time()

class TaskManager(object):
def __init__(self, numCopyThreads=5, numDeleteThreads=2):
self.copyQueue = Queue()
self.deleteQueue = Queue()
self.numCopyThreads = numCopyThreads
self.numDeleteThreads = numDeleteThreads
self.threadList = []
self.fileDict = {}
for i in range(numCopyThreads):
#thread.start_new_thread(self._copyFile, ())
copyWorker = CopyWorker(self.copyQueue, self.fileDict)
copyWorker.setDaemon(True)
copyWorker.start()
self.threadList.append(copyWorker)
for i in range(numDeleteThreads):
pass
def addCopyFile(self, mediaFile, mediaStore, preProc, postProc):
self.fileDict[mediaFile] = [time.time(), None]
self.copyQueue.put((mediaFile, mediaStore, preProc, postProc))
def addDeleteFile(self, mediaFile, mediaStore, preProc, postProc):
self.deleteQueue.put((mediaFile, mediaStore, preProc, postProc))
def _copyFile(self):
while 1:
mediaFile, mediaStore, preProc, postProc = self.copyQueue.get()
if type(mediaFile) == Shutdown:
break
for preProcessingStep in preProc:
mediaFile = preProcessingStep.process(mediaFile)
mediaStore.addFile(mediaFile)
for postProcessingStep in postProc:
mediaFile = postProcessingStep.process(mediaFile)
def _deleteFile(self):
pass
def shutdown(self):
for i in range(self.numCopyThreads):
self.copyQueue.put((Shutdown(), None, None, None))
for i in range(self.numDeleteThreads):
self.deleteQueue.put((Shutdown(), None, None, None))
for t in self.threadList:
t.join()


Basically, the task manager creates a queue for copies and one for deletes and a number of threads for each operation. When the sync manager passes the request to copy files to the task manager, it is a non-blocking call. The downloading/processing of each file happens N files at a time, depending on how many threads you've allowed to be active. The defaults are 5 for copying and 2 for deleting.

So far, this seems to be working pretty well. The only thing that I see that could use some immediate improvement is to either thread the downloading of the RSS feed(s), or to use Doug's feedcache, or both. I'm going to try to work on trying to get that supported over the weekend.

The next area of functionality addition is that of creating a GUI. From the feedback I received in a post today, I'm going to have to check out wxPython.