Worked example: Adding a scheme/pipeline

A pipleine has been developed which takes a bacterial genome sequence and from it calculates the universal Genome Statistic, which is a number that shows how virulent the bacterium is, its antibiotic resistance, the number of phage and plasmids it contains etc. etc. However, before it is published, a pipeline needs to be run on all all available genomes for a number of species, the results stored and made it available to the the public.

Adding the scheme to the database

The scheme just has one field, the universal genome statitic, which is an integer. First of all it has to be added to the schemes table ( see Scheme Parameters (scheme table) )

description name param
uni_gen_stat Universal Genome statistic {“display”:”public”,”query_method”:”process_generic_query”}

and next the parameter is added to the dataparam table, see SeroPred

tabname name display_order nested_order label datatype
uni_gen_stat genome_stat 1 0 Universal Genome Statistic integer

This scheme is public as specified by “display”:”public” and can be queried with the process_generic_query method as specified by “query_method”:”process_generic_query”. This method looks for the scheme data in the assembly_lookup table where each scheme/assembly has an entry. The scheme has one field, an integer genome_stat which will be labelled as Genome Stat. If the data are stored in a different way then a new query method will need to be defined and referenced by the “query_method” entry in the param column.

The server has to be restarted so that it is aware of the scheme which will apepar on the associated search page. The browser cache should also be cleared so that client queries will pick up the new information. “Universal Genome Statistic” should be an option in the right hand Experiment dropdown. If this is selected, the right grid will change, which will show a single column and all cells will show ND as we have not populated the scheme with any data

https://bitbucket.org/repo/xzK7Xj/images/2020196653-ugs_1.png

In theory the data could be calculated and stored elsewhere and a method written to get the data (see Scheme Based on an NCBI API call ). However the remote data would not be updated when new strains were added, hence we need to add the pipeline to EnteroBase

Adding a pipeline

in the dataparam table a couple of entries need to be added to the dictionary in the param column, pipeline and params - see below

{"display":"public",
"query_method":"process_generic_query",
"pipeline":"uni_gen_stat",
"params":{"taxon":"Salmonella"}}

The pipeline can be called by a number of different mechanisms. It can be called by a user (in edit mode on the main search page, Tools > Call Scheme For Selected) or using the Update Scheme script in manage.py (which is called automatically from the shell script daily_update.sh). As well as specifying the pipeline Enterobase has to be instructed about how to store the data. All the logic for this is in contained in a single python class, with the send_job() method sending the job and process_job() storing the data when the results are obtained.For crobot pipelines the scheme’s name (description) does not have to be same as the pipeline sepcified in params. However for local and other network jobs, this has to be the case.

Adding a Local Pipeline This is not recommended unless your pipeline is very simple. The send_job() method simply calls process_job() and hence the pipeline is run in the current thread. Although the calculation can be farmed off to celery, at the time of writing, the number of threads celery can use is limited due to SQLAlchemy issues.

Creating a Job class

from entero.jobs.jobs import LocalJob,AssemblyBasedJob,network_job_classes
from entero import celery,app,dbhandle
from random import randint
import sys,os,time

class UGSGenerator(LocalJob,AssemblyBasedJob):

        def __init__(self,**kwargs):
                super(UGSGenerator,self).__init__(**kwargs)
                #if celery is turned off in the config, this job can still use celerywith the following
                #self.use_celery=True

New pipeline job types typically requires a new class in entero/jobs/jobs.py which which inherits from LocalJob and AssemblyBasedJob and which has methods to store the data generated, keep track of whether the job succeeded and the number of failures etc. You can add extra members in the constructor but be aware that the object may be serialized to send to celery.

Creating the Process Method

def process_job(self):
                success=True
                #read the genome file
                try:
                        with open(self.assembly_filepointer) as f:
                                for line in f:
                                        pass
                        #also have access to the assembly barcode,params and scheme_id
                        taxon = self.params.get("taxon")
                        barcode = self.assembly_barcode
                        scheme_id = self.scheme_id
                        #generate the ugs (don't tell anyone its just a random number)
                        ugs = randint(1,10000)
                        #going to store in lookup table
                        lookup=self.get_lookup()
                        lookup.other_data['results']['genome_stat']=ugs

                except Exception as e:
                        #write a message to the log - the stack trace will also be included
                        app.logger.exception("Problem with ugs_generator job,assembly barcode=%s" % self.assembly_barcode)
                        success=False
                self.job_processed(success)

For a Local job, calling send_job will just initiate process_job(), which will run in the same thread (unless celery has been enabled in the config) or you have set self.use_celery=True in the class’s constructor. Because we have inherited from AssemblyBasedJob we have access to the assembly file pointer, assembly barcode and any parameters specified in the scheme table. We also have access to the SQLAlchemy lookup object which maps to the assembly_lookup table. In this case we use it to store the result

lookup=self.get_lookup()
lookup.other_data['results']['genome_stat']=ugs

Registering The Class

We need to add an entry to the entero.jobs.jobs.network_job_classes such that Entorobase knows which class to use with each pipeline.

network_job_classes['uni_gen_stat']=getattr(sys.modules[__name__],"UGSGenerator")

If a class is created outside the jobs module,it needs to import the module where the class is at the end of jobs.py

putting it all together

from entero.jobs.jobs import LocalJob,AssemblyBasedJob,network_job_classes
from entero import celery,app,dbhandle
from random import randint
import sys,os,time

class UGSGenerator(LocalJob,AssemblyBasedJob):

        def __init__(self,**kwargs):
                super(UGSGenerator,self).__init__(**kwargs)
                #if celery is turned off in the config, this job can still use celerywith the following
                #self.use_celery=True

        def process_job(self):
                success=True
                #read the genome file
                try:
                        with open(self.assembly_filepointer) as f:
                                for line in f:
                                        pass
                        #also have access to the assembly barcode,params and scheme_id
                        taxon = self.params.get("taxon")
                        barcode = self.assembly_barcode
                        scheme_id = self.scheme_id
                        #generate the ugs (don't tell anyone its just a random number)
                        ugs = randint(1,10000)
                        #going to store in lookup table
                        lookup=self.get_lookup()
                        lookup.other_data['results']['genome_stat']=ugs

                except Exception as e:
                        #write a message to the log - the stack trace will also be included
                        app.logger.exception("Problem with ugs_generator job,assembly barcode=%s" % self.assembly_barcode)
                        success=False
                self.job_processed(success)

#needed so that Enteobase knowa which class to create based on pipeline
network_job_classes['uni_gen_stat']=getattr(sys.modules[__name__],"UGSGenerator")

Running the pipeline

Running the Update Scheme script will check the database for any strains which contain genomes that have not yet got results from the pipeline and have not failed too many times.

python manage.py update_scheme -d miu -s uni_gen_stat -l 10

The -d option specifies the database (in this case, the test miu database has been used). The -l option means we limit to the number of calls to 10. So if we run the above command and then look in Enterobase, 10 records should have a universal genome statistic

To test running with celery uncomment the self.use_celery=True (or change USE_CELERY in config to True) and open up a separate command window. Navigate to the root Enterobase directory and make sure you are in the Enterobase virtual environment and run

python manage.py runcelery

The celery splash screen should appear. Then run the update_scheme script as before, but this time information about adding the record should appear in the celery window

Adding a crobot Pipeline

This example uses the GenericJob class. see SeroPred for another example.

Firstly (obviously) the pipeline needs to be added to Crobot. All that needs to do in Enterobase is (1)specify where in the returned json string the data will be and (2)tie our pipeline to the GenericJob class

  1. If the data returned from crobot looks like this:-
"comment": "null"
"source_ip": "137.205.123.127"
"tag": 2673082
"query_time": "2018-01-06 20:45:22.475990"
"log": {
     "results":
          [
          "accept"
          ,23233   //universal genome stat
          ]
}
"_cpu": 1

Then in the dataparam table log,results,1 needs to be added to the mlst_field column to identify whereabouts in the returned data the results are to be found. Note that the ‘log’ data are the contents of the log file that is the stdout output from the job when it is run.

tabname name display_order nested_order label datatype mlst_field
uni_gen_stat genome_stat 1 0 Universal Genome Statistic integer log,results,1
  1. The only other thing to be done is to tell Enterobase to use the GenericJob Class. At the bottom of entero.jobs.jobs.py add
network_job_classes['uni_gen_stat']=getattr(sys.modules[__name__],"GenericJob")

Using A Custom Class

The GenricJob class will store the results in the assembly_lookup table, but if you have more complex data and want to store it differently a sub class of GenericJob can be created which overides the process_job method. For example

class UGSJob(GenericJob):

    def __init__(self,**kwargs):
            super(UGSJob,self).__init__(**kwargs)
            #job is going to be sent
            if not self.results:
             #inputs will contain the assembly pointer, can add other values
            barcode =self.assembly_barcode
            #lookup some value with barcode
            self.inputs['key']=value
            #params will contain anything specified in the scheme table
            self.params['taxon']


    def process_job(self):
            success=True
            try:
                    #fields are a dictionary of key/value for fields specified in dataparm
                    ugs = self.fields['uni_gen_stat']
                    #have access to all the data in results
                    comment=self.results['comment']
                    #have access to the assembly barcode
                    assembly_id = decode(self.assembly_barcode)
                    #can still update the lookup_data
                    lookup=self.get_lookup()
                    lookup.param["comment"]=comment
                    #store the result in an external database
                    sql = "INSERT INTO ugs_results (ass_id,ugs_value) VALUES(%i,%i)" % (assembly_id,ugs)
                    #run the query
            except Exception as e:
                    #write a message to the log - the stack trace will also be included
                    app.logger.exception("Problem with processing UGSJob , job id=%i" % self.job_id)
                    success=False
            #need to send version
            self.job_processed(success,self.results['version'])

Another example is the AMRanalysisJob class in jobs/jobs.py. In the constructor any other parameters or inputs can be added (the assembly file pointer and any params in the scheme table have already been added). The process_job method then processes and stores the results. If the data is stored in a different format then a new query method will be needed to retreive the data and this should then be specified in the param column the scheme table, e.g.:

{"display":"public",
"query_method":"new_db_query",
"pipeline":"uni_gen_stat",
"params":{"taxon":"Salmonella"}}

Sending The Job

Local and network jobs are called in exactly the same way , so the jobs can be called Running the pipeline using:

python manage.py update_scheme -d miu -s uni_gen_stat -l 10

Adding a Remote Pipeline

This approach is not currently used on Enterobase although it is still possible although there is no table to store the complete results of the actual job. However, informstion will still be stored in the assembly_lookup table e.g. the summary results to be returned to the user, whether teh job has completed, the version number, the number of times the job failed etc

Creating The Class

class RemoteUGSJob(RemoteJob,AssemblyBasedJob):
        def __init__(self,**kwargs):
                        super(RemoteUGSJob,self).__init__(**kwargs)
                        #object created with returned data
                        #need to add a few parameters so data can be stored
                        if  self.results:
                                self.assembly_barcode=self.results['assembly_barcode']
                                self.scheme_id=self.results['scheme_id']
                                self.database=self.results['database']

        def send_job(self):
                success=True
                try:
                        URI = "http://remote_server/ugs_pipeline"
                        data={
                                #data to send (the remote sever needs access to where the assemblied are stored)
                                "assembly_filepointer":self.assembly_filepointer,
                                #or send whole sequence (not recomended)
                                #"sequence" :open(self.assembly_filepointer).read(),
                                "taxon":self.params['taxon'],
                                #the address the remote server will send the results back to
                                "callback":"http://"+app.config['CALLBACK_ADDRESS']+"/crobot_callback",
                                #all required to be sent back
                                "database":self.database,
                                "scheme_id":self.scheme_id,
                                "assembly_barcode":self.assembly_barcode
                                }
                        resp = requests.post(url = URI, data = data)
                        if resp.text<>'OK':
                                raise Exception("UGS job did not send -response %s" % resp.text)
                except Exception as e:
                        app.logger.exception("UGS job could not be sent for assembly % s" % self.assembly_barcode)
                        success=False
                self.job_sent(success)

        def process_job(self):
                success=True
                #read the genome file
                version= 'ND'
                try:
                        version=self.results['version']
                        lookup=self.get_lookup()
                        lookup.other_data['results']['genome_stat']=self.results['genome_stat']

                except Exception as e:
                        #write a message to the log - the stack trace will also be included
                        app.logger.exception("Problem with ugs_generator job,assembly barcode=%s" % self.assembly_barcode)
                        success=False
                self.job_processed(success,version)

#needed so that Enteobase knowa which class to create based on pipeline
network_job_classes['uni_gen_stat']=getattr(sys.modules[__name__],"RemoteUGSJob")

The class inherits from RemoteJob and as before from AssemnblyBasedJob. When data is returned from the remote pipeline we need to associate it with the scheme, assembly and database and since (at the moment) there is nowhere to store these, they need to be sent back from the remote pipeline. This information is retrieved in the object’s /_/_init/_/_ method when it is constructed with the returned data.

super(RemoteUGSJob,self).__init__(**kwargs)
#object created with returned data
#need to add a few parameters so data can be stored
if  self.results:
        self.assembly_barcode=self.results['assembly_barcode']
        self.scheme_id=self.results['scheme_id']
        self.database=self.results['database']

The send_job method is quite simple - we have access to the the file_pointer and any parameters defined in the scheme table. If only the file pointer is being sent, the remote pipeline would need access to the shared area. The sequence could be sent but this would be a large overhead. The following have to be sent

  • scheme id
  • assembly barcode
  • database
  • callback address (where the remote server will send the results to)

Finally the process_job method will store the data returned from the server, the ugs value is stored in the lookup table as before.

The remote server could handle the request as follows:-

@main.route("ugs_pipeline",methods = ['POST'])
def ugs_pipeline():
    data = request.form
    #these are needed in the response
    response={
        'assembly_barcode':data['assembly_barcode'],
        'scheme_id':data['scheme_id'],
        'database':data['database'],
        "pipeline":"uni_gen_stat",
        "version":"4.4"
    }
    #get stuff we need
    taxon = data['taxon']
    pointer=data['assembly_filepointer']
    url= data['callback']
    #generate our result
    def our_pipeline():
        try:
            ugs = randint(1,1000)
            sleep(10)
            response['status']="COMPLETE"
        except:
            response['status']="FAILED"
        #add our result to the response
        response['genome_stat']=ugs
        json = ujson.dumps(response)
        requests.post(url,data={"CALLBACK":json})
    t = Thread(target=our_pipeline)
    t.start()

    return "OK"

This is an extremely contrived example but the main points to note are that the following are required in the response (along with any computed values obviously)

  • scheme_id -used to store results
  • assembly_barcode - used to store results
  • database - used to store results
  • pipeline - needed get the right class
  • status - either COMPLETE or FAILED

The response needs to be jsonified and returned

Adding Functionality To the Grid

By default the BaseExperimentGrid in entero/static/js/table/base_experiment_grid.js is used to display the scheme’s data. If extra functionality needs to be added, it would have to be subclassed. In this case any cells with a value >8000 will be shown with a red background and there will be an extra column with an eye, which, when clicked will display information and add an option to the context menu.

https://bitbucket.org/repo/xzK7Xj/images/393054424-ugs_custom_grid.png

In order to customize the grid we need to subclass it and specify the JavaScript class and its containing file in the scheme’s params

Creating The Grid

Add a file ugs_grid.js (which we specified in the scheme’s parameters) to entero/static/js/table and then define the class as follows

UGSGrid.prototype = Object.create(BaseExperimentGrid.prototype);
UGSGrid.prototype.constructor= UGSGrid;
function UGSGrid(name,config,species,scheme,scheme_name){
        BaseExperimentGrid.call(this,name,config,species,scheme,scheme_name);
};

Adding a Custom Renderer We want to show all values greater than 8000 with a red background. In the constructor we can add the following code

this.addExtraRenderer("genome_stat",function(cell,value,row_id,col_name_row_index){
                if (value>8000){
                        $(cell).css("background-color","red");
                }
});

Adding an Extra Column and Handlers The grid will automatically be populated with the columns we specified in the data_param table, but we can add other columns. In this case we are adding a column which contains an eye icon, which when clicked will display information about the row. We can do this in the setMetaData method which is called before the grid is added to the page. As well as adding the column in this method we also add a renderer which displays an eye icon and a handler which calls the displayInfo method when a cell is clicked and passes the value of tbe genome_stat column.

Adding an Option to the Context Menu We also add an option to the context menu to do the same thing as the eye icon . The following code will add a More Info option to the context menu, which will call the displayInfo method and pass the value of genome_stat

UGSGrid.prototype.addToContextMenu =  function (row_index,col_index, target){
        var self = this;
        var extramenu=[ {
                         name: 'More Info',
                         title: 'More Info',
                         fun: function () {
                               col_index=self.getColumnIndex('genome_stat');
                               value =self.getValueAt(row_index,col_index);
                               self.displayInfo(value);
                        }
                }
        ];
        return extramenu;
}

Finally we need to add the actual displayInfo method

UGSGrid.prototype.displayInfo = function(value){
        Enterobase.modalAlert("The UGS value is "+ value);
}