viernes, 30 de septiembre de 2016

Python Multithreaded Asset Downloader

We are getting towards the end of the production of El Viaje Imposible 's teaser. A mixed 3d/real image project i ve been working on for the last few months along with other fantastic and experienced co-workers.

For the time we ve been developing the pipeline and artists using it, everyday there were some kind of issue and lately the problem was due to the http protocol we used to do the transfers. This was set from the very beginning hoping to review the different uploading methods our pipe allows in the future where the need to send massive amounts of files aroused.

Well, this time has come, Cloth and Hair Artists are already working and in order to pass on their work to the lighters they need to export caches files. Taking into account that our Hair plugin generates  a cache file per frame (even though one can choose to do inter frame caching also, i.e. to avoid flickering), that there may be a couple of plugin nodes that read/export cache multiplied by the number of characters in a shot this makes hundreds of files if not thousands of files to be sent to the server. Hence the need of a good bulletproof protocol.

Forgot to say, a lot of artists are working remotely! With all the inconvenientes this implies, you see.

This week we have improved a lot our checkin/checkout pipeline. We dont use anymore HTTP but have relied now on Samba as our audiovisual project management system allows this.

From my part, one of the improvements i ve done this week is to "parallelize" the assets downloader tool. The first release was running a unique thread in the background and downloaded each pipeline task assets sequentially. 

This was unbearable when we got deeper in the production as more advanced tasks depended upon all the previous  tasks. This means in order to perform a task, an artist should wait until near more than a hundred tasks were checked taking as long as 10 min sitting just with crossed arms.

IMPLEMENTATION

The goal was to substitute the sequential background thread with a configurable number of independent threads each in charge of checking the assets of a unique task. For this, we identify a class Job that is responsible for holding its own connection through Tactic API and all the metadata needed to tell to Tactic what are the assets it is looking for.

Then we define our Worker Class that will be sharing a thread-safe Queue. This Worker class will ask for the current job indefinitely while there are still jobs in the queue. Actually this is a variant of the Producer/Consumer problem where we fulfill the queue with jobs from the beginning, so there is no need for a producer thread.


class Worker(QtCore.QThread):
        
        '''
        define different signals to emit
        '''
        def __init__(self, queue, report):
            QtCore.QThread.__init__(self)
            self.queue = queue
            self.abort = False
            '''
            rest of variables
            '''        
        def run(self):

            while not self.abort and not self.queue.empty():
                job = self.queue.get()
                
                try:
                                
                    response = job.execute()
                    
                except Exception,e:
                    
                    process(e)

                self.queue.task_done()          


One of the problems left then is how to shutdown all the threads when closing the QDialog. I had quite a hard time figuring out the best way of doing it. 
Googleing a bit, people asked the same questions when your thread is running a while True sort of loop. Most people tend to confirm that the most elegant way is to put a "semaphor" also called "sentinel" which no any other thing that a boolean that is checked within every iteration. This allows to set this boolean from outside the thread, so next time it iterates it will jump out of the loop.

Another possibility is to put a Job None Object in the queue, so that immediately after retrieving it from the queue the thread checks its value and exits accordingly. This would work for a single thread, if we spawn 10 threads we should put 10 None Job Objects in the queue.

This leaves the question..¿how to terminate a specific thread? It's not needed here but rather something to think of later...

I resorted to the first elegant solution, that's the reason of the self.abort. So here is the code that overrides the closeEvent()


    def closeEvent(self, *args, **kwargs):
        
        for t in self.threads:
            if t.isRunning():
                t.abort = True
        import time
        time.sleep(2)
        for t in self.td.threads:
            t.terminate()
       
        return QtGui.QDialog.closeEvent(self, *args, **kwargs)


As you can see, before closing, we set the semaphore of each thread to True. The interesting thing about this code is that if you inmediately after try to terminate the thread (im not gonna discuss here the correctness of terminating/killing a thread) the window gets hung. Not sure why this is happening. All we need to do is sleep() a sufficient amount of time to give all the threads the chance to get out of the job.execute() and check for the semaphore.

My only concern with this solution is: what happens if one of the threads is downloading say 1 GB of data? would 2 seconds like in the example be enough time for it to get to the semaphore checking and then exit ? 

That's why i would really want to tweak the Tactic API and get low level for each downloaded chunk of data for example 10 MB. In 10MB slices this problem would disappear... but i'm stuck with the API for now and its interface.


IS IT REALLY PARALLEL?

Well not really. This same code in C or C++ would work totally parallelized but we are bumping into the GIL here, the Global Interpreter Lock of the MayaPy and CPython interpreters. You can have a look at all the posts regarding this in google. Basically, the GIL is a mechanism that forbids the python code to run more than one thread at the same time. This is to prevent the Interpreter's memory gets corrupted.

if we want full parallelization, we should go into multiprocessing which differs from multithreading in that each spawned process has its own memory space. Ideal when you dont need to share objects between processes for example or the need is little. Apart from the fact that, a lot of benchmarks that some people have done, come to the conclusion that in Python, multithreading tends to take more time in CPU-bound tasks that the same code running in single thread.

So if your task is CPU expensive, then try to go Multiprocessing rather than multithreading. But, if your tasks are I/O, networking ,etc (like it is the case here) i find multithreading more suitable. 

Nevertheless, i would like to give it another spin to the code a run benchmarks this time using the multiprocessing module.