Worked example: Adding a scheme/pipeline

You have just developed a pipleine which takes a bacterial genome sequence and from it calculates the universal Genome Statistic, which is a number that shows how virulent the bacterium is, its antibiotic resistance, the number of phage and plasmids it contains etc. etc. However, before you publish it in Nature, you want to run this pipeline on all all available genomes for a number of species, store the results and make it available to the the public.

Adding the scheme to the database

The scheme just has one field, the universal genome statistic, which is an integer. First of all we have to add it to the schemes table ( see Scheme Parameters (scheme table) )

description name param
uni_gen_stat Universal Genome statistic {“display”:”public”,”query_method”:”process_generic_query”}

next add the parameter to the dataparam table, see SeroPred (Generic Scheme)

tabname name display_order nested_order label datatype
uni_gen_stat genome_stat 1 0 Universal Genome Statistic integer

This scheme is public as specified by “display”:”public” and can be queried with the process_generic_query method as specified by “query_method”:”process_generic_query”. This method looks for the scheme data in the assembly_lookup table where each scheme/assembly has an entry The scheme has one field an integer genome_stat which will be labelled as Genome Stat. If the data are stored in a different way then a new query method will need to be defined and referenced by the “query_method” entry in the param column.

Restart the server so it is aware of the scheme and go the main search page, and clear the browser cache so that client queries will pick up the new information. “Universal Genome Statistic” should be an option int the right hand Experiment dropdown. If you select this, the right grid will change, which will show a single column and all cells will show ND as we have not populated the scheme with any data

https://bitbucket.org/repo/xzK7Xj/images/2020196653-ugs_1.png

In theory we could calculate and store the data elsewhere and write a method to get the data (see Scheme Based on an NCBI API call ) . However the remote data would not be updated when new strains were added, hence we need to add the pipeline to Enterobase

Adding a pipeline

in the dataparam table we need to add a couple of entries to the dictionary in the param column, pipeline and params - see below

{"display":"public",
"query_method":"process_generic_query",
"pipeline":"uni_gen_stat",
"params":{"taxon":"Salmonella"}}

The pipeline can be called by a number of different mechanisms. It can called by a user (in edit mode on the main search page, Tools > Call Scheme For Selected) or using the Update Scheme script in manage.py (which is called automatically from the shell script daily_update.sh) AS well as specifying the pipeline you also have to instruct Enterobase on how to store the data. All the logic for this is in contained in a single python class, with the send_job() method sending the job and process_job() storing the data when the results are obtained.For crobot pipelines the scheme’s name (description) does not have to be same as the pipeline sepcified in params. However for local and other network jobs, this has to be the case.

Adding a Local Pipeline This is not recommended unless your pipeline is very simple. The send_job() method simply calls process_job() and hence the pipeline is run in the current thread. Although you can farm the calculation off to celery , at the time of writing, the number of threads celery can use is limited due to SQLAlchemy issues.

Creating a Job class

from entero.jobs.jobs import LocalJob,AssemblyBasedJob,network_job_classes
from entero import celery,app,dbhandle
from random import randint
import sys,os,time

class UGSGenerator(LocalJob,AssemblyBasedJob):

        def __init__(self,**kwargs):
                super(UGSGenerator,self).__init__(**kwargs)
                #if celery is turned off in the config, this job can still use celerywith the following
                #self.use_celery=True

You need to create a class which inherits from LocalJob and AssemblyBasedJob, which has methods to store the data generated, keep track of whether the job succeeded and the number of failures etc. You can add extra members in the constructor but be aware that the object may be serialized to send to celery.

Creating the Process Method

def process_job(self):
                success=True
                #read the genome file
                try:
                        with open(self.assembly_filepointer) as f:
                                for line in f:
                                        pass
                        #also have access to the assembly barcode,params and scheme_id
                        taxon = self.params.get("taxon")
                        barcode = self.assembly_barcode
                        scheme_id = self.scheme_id
                        #generate the ugs (don't tell anyone its just a random number)
                        ugs = randint(1,10000)
                        #going to store in lookup table
                        lookup=self.get_lookup()
                        lookup.other_data['results']['genome_stat']=ugs

                except Exception as e:
                        #write a message to the log - the stack trace will also be included
                        app.logger.exception("Problem with ugs_generator job,assembly barcode=%s" % self.assembly_barcode)
                        success=False
                self.job_processed(success)

For a Local job, calling send_job will just initiate process_job(), which will run in the same thread (unless celery has been enabled in the config) or you have set self.use_celery=True in the class’s constructor. Because we have inherited from AssemblyBasedJob we have access to the assembly file pointer, assembly barcode and any parameters specified in the scheme table. We also have access to the SQLAlchemy lookup object which maps to the assembly_lookup table. In this case we use it to store the result

lookup=self.get_lookup()
lookup.other_data['results']['genome_stat']=ugs

Registering The Class We need to add an entry to the entero.jobs.jobs.network_job_classes such that Enetrobase knows which class to use with each pipeline.

network_job_classes['uni_gen_stat']=getattr(sys.modules[__name__],"UGSGenerator")

If you create the class outside the jobs module,you need to import the module where the class is st the end of jobs.py

putting it all together

from entero.jobs.jobs import LocalJob,AssemblyBasedJob,network_job_classes
from entero import celery,app,dbhandle
from random import randint
import sys,os,time

class UGSGenerator(LocalJob,AssemblyBasedJob):

        def __init__(self,**kwargs):
                super(UGSGenerator,self).__init__(**kwargs)
                #if celery is turned off in the config, this job can still use celerywith the following
                #self.use_celery=True

        def process_job(self):
                success=True
                #read the genome file
                try:
                        with open(self.assembly_filepointer) as f:
                                for line in f:
                                        pass
                        #also have access to the assembly barcode,params and scheme_id
                        taxon = self.params.get("taxon")
                        barcode = self.assembly_barcode
                        scheme_id = self.scheme_id
                        #generate the ugs (don't tell anyone its just a random number)
                        ugs = randint(1,10000)
                        #going to store in lookup table
                        lookup=self.get_lookup()
                        lookup.other_data['results']['genome_stat']=ugs

                except Exception as e:
                        #write a message to the log - the stack trace will also be included
                        app.logger.exception("Problem with ugs_generator job,assembly barcode=%s" % self.assembly_barcode)
                        success=False
                self.job_processed(success)

#needed so that Enteobase knowa which class to create based on pipeline
network_job_classes['uni_gen_stat']=getattr(sys.modules[__name__],"UGSGenerator")

Running the pipeline Running the Update Scheme script will check the database for any strains which contain genomes that have not yet got results from the pipeline and have not failed too many times.

python manage.py update_scheme -d miu -s uni_gen_stat -l 10

The -d option specifies the database (in this case, the test miu database has been used). The -l option means we limit to the number of calls to 10. So if we run the above command and then look in Enterobase, 10 records should have a universal genome statistic

To test running with celery uncomment the self.use_celery=True (or change USE_CELERY in config to True) and open up a separate command window. Navigate to the root Enterobase directory and make sure you are in the Enterobase virtual environment and run

python manage.py runcelery

The celery splash screen should appear. Then run the update_scheme script as before, but this time information about adding the record should appear in the celery window

Adding a crobot Pipeline

Using the GenericJob Class see SeroPred (Generic Scheme) for another example.

Firstly (obviously) We need to add the pipeline to Crobot. But all we need to do in Enterobase is (1)specify where in returned json string our the data will be and (2)tie our pipeline to the GenericJob class

  1. If the data returned from crobot looked like this:-
"comment": "null"
"source_ip": "137.205.123.127"
"tag": 2673082
"query_time": "2018-01-06 20:45:22.475990"
"log": {
     "results":
          [
          "accept"
          ,23233   //universal genome stat
          ]
}
"_cpu": 1

Then in the dataparam table we would need to add log,results,1 to the mlst_field column to identify whereabouts in the returned data the results are to be found. Note that the ‘log’ data are the contents of the log file that is the stdout output from the

tabname name display_order nested_order label datatype mlst_field
uni_gen_stat genome_stat 1 0 Universal Genome Statistic integer log,results,1
  1. The only other thing we need to do is to tell Enterobase to use the GenericJob Class. At the bottom of entero.jobs.jobs.py add
network_job_classes['uni_gen_stat']=getattr(sys.modules[__name__],"GenericJob")

Using A Custom Class The GenricJob class will store the results in the assembly_lookup table, but if you have more complex data and want to store it diffrently you can create a sub class of GenericJob and overide the process_job method.For example

class UGSJob(GenericJob):

    def __init__(self,**kwargs):
            super(UGSJob,self).__init__(**kwargs)
            #job is going to be sent
            if not self.results:
             #inputs will contain the assembly pointer, can add other values
            barcode =self.assembly_barcode
            #lookup some value with barcode
            self.inputs['key']=value
            #params will contain anything specified in the scheme table
            self.params['taxon']


    def process_job(self):
            success=True
            try:
                    #fields are a dictionary of key/value for fields specified in dataparm
                    ugs = self.fields['uni_gen_stat']
                    #have access to all the data in results
                    comment=self.results['comment']
                    #have access to the assembly barcode
                    assembly_id = decode(self.assembly_barcode)
                    #can still update the lookup_data
                    lookup=self.get_lookup()
                    lookup.param["comment"]=comment
                    #store the result in an external database
                    sql = "INSERT INTO ugs_results (ass_id,ugs_value) VALUES(%i,%i)" % (assembly_id,ugs)
                    #run the query
            except Exception as e:
                    #write a message to the log - the stack trace will also be included
                    app.logger.exception("Problem with processing UGSJob , job id=%i" % self.job_id)
                    success=False
            #need to send version
            self.job_processed(success,self.results['version'])

In the constructor you can also add any other parameters or inputs (the assembly file pointer and any params in the scheme table have already been added). In the process_job method you can then process and store the results. Obviously if you store the data in a different format you will have to write a new Scheme Based on an NCBI API call and specify it in the param column the scheme table.

Sending The Job Local and network jobs are called in exactly the same way , so the jobs can be called Running the Pipeline using

python manage.py update_scheme -d miu -s uni_gen_stat -l 10

Adding a Remote Pipeline Not designed although still possible although there is no table to store results on the actual job, informstion will still be stored in the lookup table i.e. whether complete version number number of times failed etc

Creating The Class

class RemoteUGSJob(RemoteJob,AssemblyBasedJob):
        def __init__(self,**kwargs):
                        super(RemoteUGSJob,self).__init__(**kwargs)
                        #object created with returned data
                        #need to add a few parameters so data can be stored
                        if  self.results:
                                self.assembly_barcode=self.results['assembly_barcode']
                                self.scheme_id=self.results['scheme_id']
                                self.database=self.results['database']

        def send_job(self):
                success=True
                try:
                        URI = "http://remote_server/ugs_pipeline"
                        data={
                                #data to send (the remote sever needs access to where the assemblied are stored)
                                "assembly_filepointer":self.assembly_filepointer,
                                #or send whole sequence (not recomended)
                                #"sequence" :open(self.assembly_filepointer).read(),
                                "taxon":self.params['taxon'],
                                #the address the remote server will send the results back to
                                "callback":"http://"+app.config['CALLBACK_ADDRESS']+"/crobot_callback",
                                #all required to be sent back
                                "database":self.database,
                                "scheme_id":self.scheme_id,
                                "assembly_barcode":self.assembly_barcode
                                }
                        resp = requests.post(url = URI, data = data)
                        if resp.text<>'OK':
                                raise Exception("UGS job did not send -response %s" % resp.text)
                except Exception as e:
                        app.logger.exception("UGS job could not be sent for assembly % s" % self.assembly_barcode)
                        success=False
                self.job_sent(success)

        def process_job(self):
                success=True
                #read the genome file
                version= 'ND'
                try:
                        version=self.results['version']
                        lookup=self.get_lookup()
                        lookup.other_data['results']['genome_stat']=self.results['genome_stat']

                except Exception as e:
                        #write a message to the log - the stack trace will also be included
                        app.logger.exception("Problem with ugs_generator job,assembly barcode=%s" % self.assembly_barcode)
                        success=False
                self.job_processed(success,version)

#needed so that Enteobase knowa which class to create based on pipeline
network_job_classes['uni_gen_stat']=getattr(sys.modules[__name__],"RemoteUGSJob")

The class inherits from RemoteJob and as before from AssemnblyBasedJob. When data is returned from the remote pipeline we need to associate it with the scheme, assembly and database and since (at the moment) there is nowhere to store these, they need to be sent back from the remote pipeline. This information is retrieved in the object’s /_/_init/_/_ method when it is constructed with the returned data.

super(RemoteUGSJob,self).__init__(**kwargs)
#object created with returned data
#need to add a few parameters so data can be stored
if  self.results:
        self.assembly_barcode=self.results['assembly_barcode']
        self.scheme_id=self.results['scheme_id']
        self.database=self.results['database']

The send_job method is quite simple - we have access to the the file_pointer and any parameters defined in the scheme table. If we are only sending the file pointer, the remote pipeline obviously needs access to the shared area. The sequence could be sent but this would be a large overhead. The following have to be sent

  • scheme id
  • assembly barcode
  • database
  • callback address (where the remote server will send the results to)

Finally the process_job method will store the data returned from the server, the ugs value is stored in the lookup table as before.

The remote server could handle the request as follows:-

@main.route("ugs_pipeline",methods = ['POST'])
def ugs_pipeline():
    data = request.form
    #these are needed in the response
    response={
        'assembly_barcode':data['assembly_barcode'],
        'scheme_id':data['scheme_id'],
        'database':data['database'],
        "pipeline":"uni_gen_stat",
        "version":"4.4"
    }
    #get stuff we need
    taxon = data['taxon']
    pointer=data['assembly_filepointer']
    url= data['callback']
    #generate our result
    def our_pipeline():
        try:
            ugs = randint(1,1000)
            sleep(10)
            response['status']="COMPLETE"
        except:
            response['status']="FAILED"
        #add our result to the response
        response['genome_stat']=ugs
        json = ujson.dumps(response)
        requests.post(url,data={"CALLBACK":json})
    t = Thread(target=our_pipeline)
    t.start()

    return "OK"

This is an extremely contrived example but the main points to note are that the following are required in the response (along with any computed values obviously)

  • scheme_id -used to store results
  • assembly_barcode - used to store results
  • database - used to store results
  • pipeline - needed get the right class
  • status - either COMPLETE or FAILED

The response needs to be jsonified and returned

Adding Functionality To the Grid

By default the BaseExperimentGrid in entero/static/js/table/base_experiment_grid.js is used to display the scheme’s data. If we want to add extra functionality, we have to subclass it. In this case we are going to show any cells with a value >8000 with a red background add an extra column with eye column, which when clicked will display information and add an option to the context menu.

https://bitbucket.org/repo/xzK7Xj/images/393054424-ugs_custom_grid.png

In order to customize the grid we need to subclass it and specify the JavaScript class and its containing file in the scheme’s params

Creating The Grid

Add a file ugs_grid.js (which we specified in the scheme’s parameters) to entero/static/js/table and then define the class as follows

UGSGrid.prototype = Object.create(BaseExperimentGrid.prototype);
UGSGrid.prototype.constructor= UGSGrid;
function UGSGrid(name,config,species,scheme,scheme_name){
        BaseExperimentGrid.call(this,name,config,species,scheme,scheme_name);
};

Adding a Custom Renderer We want to show all values greater than 8000 with a red background. In the constructor we can add the following code

this.addExtraRenderer("genome_stat",function(cell,value,row_id,col_name_row_index){
                if (value>8000){
                        $(cell).css("background-color","red");
                }
});

Adding an Extra Column and Handlers The grid will automatically be populated with the columns we specified in the data_param table, but we can add other columns. In this case we are adding a column which contains an eye icon, which when clicked will display information about the row. We can do this in the setMetaData method which is called before the grid is added to the page. As well as adding the column in this method we also add a renderer which displays an eye icon and a handler which calls the displayInfo method when a cell is clicked and passes the value of tbe genome_stat column.

Adding an Option to the Context Menu We also add an option to the context menu to do the same thing as the eye icon . The following code will add a More Info option to the context menu, which will call the displayInfo method and pass the value of genome_stat

UGSGrid.prototype.addToContextMenu =  function (row_index,col_index, target){
        var self = this;
        var extramenu=[ {
                         name: 'More Info',
                         title: 'More Info',
                         fun: function () {
                               col_index=self.getColumnIndex('genome_stat');
                               value =self.getValueAt(row_index,col_index);
                               self.displayInfo(value);
                        }
                }
        ];
        return extramenu;
}

Finally we need to add the actual displayInfo method

UGSGrid.prototype.displayInfo = function(value){
        Enterobase.modalAlert("The UGS value is "+ value);
}