Mostrando entradas con la etiqueta multiprocessing. Mostrar todas las entradas
Mostrando entradas con la etiqueta multiprocessing. Mostrar todas las entradas

viernes, 30 de septiembre de 2016

Python Multithreaded Asset Downloader

We are getting towards the end of the production of El Viaje Imposible 's teaser. A mixed 3d/real image project i ve been working on for the last few months along with other fantastic and experienced co-workers.

For the time we ve been developing the pipeline and artists using it, everyday there were some kind of issue and lately the problem was due to the http protocol we used to do the transfers. This was set from the very beginning hoping to review the different uploading methods our pipe allows in the future where the need to send massive amounts of files aroused.

Well, this time has come, Cloth and Hair Artists are already working and in order to pass on their work to the lighters they need to export caches files. Taking into account that our Hair plugin generates  a cache file per frame (even though one can choose to do inter frame caching also, i.e. to avoid flickering), that there may be a couple of plugin nodes that read/export cache multiplied by the number of characters in a shot this makes hundreds of files if not thousands of files to be sent to the server. Hence the need of a good bulletproof protocol.

Forgot to say, a lot of artists are working remotely! With all the inconvenientes this implies, you see.

This week we have improved a lot our checkin/checkout pipeline. We dont use anymore HTTP but have relied now on Samba as our audiovisual project management system allows this.

From my part, one of the improvements i ve done this week is to "parallelize" the assets downloader tool. The first release was running a unique thread in the background and downloaded each pipeline task assets sequentially. 

This was unbearable when we got deeper in the production as more advanced tasks depended upon all the previous  tasks. This means in order to perform a task, an artist should wait until near more than a hundred tasks were checked taking as long as 10 min sitting just with crossed arms.

IMPLEMENTATION

The goal was to substitute the sequential background thread with a configurable number of independent threads each in charge of checking the assets of a unique task. For this, we identify a class Job that is responsible for holding its own connection through Tactic API and all the metadata needed to tell to Tactic what are the assets it is looking for.

Then we define our Worker Class that will be sharing a thread-safe Queue. This Worker class will ask for the current job indefinitely while there are still jobs in the queue. Actually this is a variant of the Producer/Consumer problem where we fulfill the queue with jobs from the beginning, so there is no need for a producer thread.


class Worker(QtCore.QThread):
        
        '''
        define different signals to emit
        '''
        def __init__(self, queue, report):
            QtCore.QThread.__init__(self)
            self.queue = queue
            self.abort = False
            '''
            rest of variables
            '''        
        def run(self):

            while not self.abort and not self.queue.empty():
                job = self.queue.get()
                
                try:
                                
                    response = job.execute()
                    
                except Exception,e:
                    
                    process(e)

                self.queue.task_done()          


One of the problems left then is how to shutdown all the threads when closing the QDialog. I had quite a hard time figuring out the best way of doing it. 
Googleing a bit, people asked the same questions when your thread is running a while True sort of loop. Most people tend to confirm that the most elegant way is to put a "semaphor" also called "sentinel" which no any other thing that a boolean that is checked within every iteration. This allows to set this boolean from outside the thread, so next time it iterates it will jump out of the loop.

Another possibility is to put a Job None Object in the queue, so that immediately after retrieving it from the queue the thread checks its value and exits accordingly. This would work for a single thread, if we spawn 10 threads we should put 10 None Job Objects in the queue.

This leaves the question..¿how to terminate a specific thread? It's not needed here but rather something to think of later...

I resorted to the first elegant solution, that's the reason of the self.abort. So here is the code that overrides the closeEvent()


    def closeEvent(self, *args, **kwargs):
        
        for t in self.threads:
            if t.isRunning():
                t.abort = True
        import time
        time.sleep(2)
        for t in self.td.threads:
            t.terminate()
       
        return QtGui.QDialog.closeEvent(self, *args, **kwargs)


As you can see, before closing, we set the semaphore of each thread to True. The interesting thing about this code is that if you inmediately after try to terminate the thread (im not gonna discuss here the correctness of terminating/killing a thread) the window gets hung. Not sure why this is happening. All we need to do is sleep() a sufficient amount of time to give all the threads the chance to get out of the job.execute() and check for the semaphore.

My only concern with this solution is: what happens if one of the threads is downloading say 1 GB of data? would 2 seconds like in the example be enough time for it to get to the semaphore checking and then exit ? 

That's why i would really want to tweak the Tactic API and get low level for each downloaded chunk of data for example 10 MB. In 10MB slices this problem would disappear... but i'm stuck with the API for now and its interface.


IS IT REALLY PARALLEL?

Well not really. This same code in C or C++ would work totally parallelized but we are bumping into the GIL here, the Global Interpreter Lock of the MayaPy and CPython interpreters. You can have a look at all the posts regarding this in google. Basically, the GIL is a mechanism that forbids the python code to run more than one thread at the same time. This is to prevent the Interpreter's memory gets corrupted.

if we want full parallelization, we should go into multiprocessing which differs from multithreading in that each spawned process has its own memory space. Ideal when you dont need to share objects between processes for example or the need is little. Apart from the fact that, a lot of benchmarks that some people have done, come to the conclusion that in Python, multithreading tends to take more time in CPU-bound tasks that the same code running in single thread.

So if your task is CPU expensive, then try to go Multiprocessing rather than multithreading. But, if your tasks are I/O, networking ,etc (like it is the case here) i find multithreading more suitable. 

Nevertheless, i would like to give it another spin to the code a run benchmarks this time using the multiprocessing module.








lunes, 18 de abril de 2016

FFMPEG and Multiprocessing

We are at the edge of the end of production here, very few stand still and with them very much of our daily joy because "there is no good or bad company", it's the people that conform and that you have a continuous treat with that count and make the working environment such a great place.

Anyways im gonna talk (as usual) about the last tool i ve had to code at work. Apparently, there s been a mismatching of shots between the two studios involved and from a production point of view they needed to have the whole movie in playblast sequentially so that they had the screen split in two, on top of it the last anim playblast and at the bottom the last refine so that they could compare and make sure the outer studio was getting the last version for lighting etc....

My first thought was to have a look at Adobe Premiere SDK and see if by any chance it had any python API i could play with. 

After a bit of research i found that there was no way to import an xml file with shots and duration and automatically convert it to a final video. Also the only thing you can do with premiere is plugin development with C++ at the "filters level" which means it is not as tweakable as Maya by any far. It was too much overkill for my  needs.

Then somehow i started to look for tools in linux and i bumped into ffmpeg. So surprised and amazed it was not my first choice! Now surely i would recommend it to anyone having to play with video compositing and mixing.

Now i can start to code!

TOOL SKELETON

First iterate through the anim and refine folders. As there is no conflict and no need to share data this could be easily parallelized. Each process would fill a dictionary where the key is like "ACT0X_SQ00XX_SH00XX" and the value the complete filepath of the most recent file.

       
def return_file_dict(root,queue):
    '''
    iterate through each filesystem branch and fill the dictionary, finally put it in the multiprocess safe queue
    '''
    queue.put(root)
    queue.put(file_dict)

def main():
    process_list = []
    queue = Manager().Queue()
    
    for root, dictionary in zip([DST_TMP_LAST_ANIM,DST_TMP_LAST_CROWD_OR_REFINE],[last_anim_dict,last_crowdrefine_dict]):
        p = Process(target=return_file_dict,args=(root,queue,))
        process_list.append(p)
        p.start()
    
    for p in process_list:
        p.join()
    
    '''
    Rescue both dictionaries and merge top/bottom with ffmpeg
    '''
       
 


After these all i needed was to filter both dictionaries and merge the two playblasts corresponding to a given shot/entry in the dict with ffmpeg.

Now these would open a gnome-terminal for each command. So the next thought was to pipe all the commands to a string which then would be executed in a single call to subprocess.call.

But there was a problem: there is a limit in the number of characters you can send as a command to subprocess.call. This was a good idea in the sense that it would only require a call and all would happen in the same terminal/linux process.

The next logical step was to say Ok i can't send all the commands as a string but i can dump the string to a shell script file and execute that shell script from within the subprocess call!!

       
    #
    # compound all the shell script commands into command_element_string
    #

    with open(FFMPEG_COMMANDS, 'w') as f:
        f.write(command_element_string)

    command = 'sh ' + FFMPEG_COMMANDS
    subprocess.call(['gnome-terminal','-x','bash','-c',command],shell=False,env=os.environ.copy())
    
 


FFMPEG SHELL SCRIPT CALLED FROM SUBPROCESS

       
command_element_string +='ffmpeg -y -i ' + last_anim_filepath + ' -i '+ last_crowdrefine_filepath + ' -filter_complex "[0:v]scale=w=999:h=540[v0];[1:v]scale=w=999:h=540[v1];[v0][v1]vstack=inputs=2[v]" -map "[v]" -map $RESULT:a -ac 2 -b:v 4M ' + output_filepath +';\n'
    
 


This would force the resolution to be w=999 h=540 of each of the videos we vertically stack. We Force it because if the resolutions dont match the conversion will fail.

Also another important comand here is the "$RESULT" value which in this case must be 0 or 1 depending on the audio track we choose to be embedded in the output file.

This can vary since there were playblasts from anim as well as from refine that were missing the audio. $RESULT is the result of doing and ffprobe test to each one of the files to ask for audio info.

The only unavoidable case left is when neither of the two files has an audio track, in which case the conversion fails. So far i could take care of this as well but i havent found yet this case so most probably not gonna treat it.

This is the embedded function in the shell script:

       
    command_element_string = 'function ttl_ffprobe()\n'
    command_element_string += '{\n'
    command_element_string += 'RESULT_ANIM="" ;\n'
    command_element_string += 'RESULT_REFINE="" ;\n'
    command_element_string += 'RESULT_ANIM=$(ffprobe -i $1 -show_streams -select_streams a -loglevel error) ;\n'
    command_element_string += 'RESULT_REFINE=$(ffprobe -i $2 -show_streams -select_streams a -loglevel error) ;\n'
    command_element_string += 'CHANNEL_SELECTION=0 ;\n'
    command_element_string += 'if [ -z "$RESULT_ANIM" ]\n'
    command_element_string += 'then\n'
    command_element_string += '\tCHANNEL_SELECTION=1\n'
    command_element_string += 'fi\n'
    command_element_string += 'return $CHANNEL_SELECTION\n'
    command_element_string += '}\n'
    
 

Once we have side by side all the playblasts anim and refine, all that is left is to merge all of them into the final sequence/movie which can be easily done with the "cat" command and properly chosen container. I refer you to the documentation: https://ffmpeg.org/ffmpeg.html