Jobs

Database Tables That Hold Information About Jobs

user_jobs All crobot jobs are kept in the user_jobs database. This database was designed early on and never modified, hence the fields accession and pipeline contain different types of values, depending on the nature of the job.

  • job_id the same as the crobot
  • accession This value is used to get an handle on which assembly or analysis is tied to the job. It is the ‘tag’ value in CRobot objects
    • For Assembly jobs this is a comma delimited list of SRA accession numbers. For user uploaded reads it is represented by trace_acccesion#read1_name#read2_name.
    • For Analysis jobs it can vary but is usually the analysis_name: analysis_id or just the analysis id
    • For Nomenclature or other schemes it is the barcode of the assembly being processed
  • user_id The id of the user
  • status The status that crobot returns
  • pipeline This is the value used to get the correct class to process when the job returns (in entero.jobs.jobs.get_crobot_object). In the majority of cases, it is the same as the crobot pipeline, but not always. Any extra information or comments can be stored after a semi colon in the value, which is ignored. It is the ‘type’ value in CRobot objects.
    • For Assembly jobs it is the name of the crobot pipeline - QAssembly
    • For nomenclature jobs it is in the form of nomenclature:scheme_name e.g nomenclature:Salmonella_rMLST
    • For non nomenclature schemes/experiments it is the scheme description (name) in the scheme table, which is usually the crobot pipeline e.g SeroPred.
    • For Analysis Jobs again, this value is usually same as the crobot pipeline. An example where it differs is for a Locus Search, where this value will be ‘locus_search’ , but the crobot pipeline is actually nomenclature. Therfore when a nomenclature job created by a locus search is returned, it will be processed by a LocusSearchJob object, not a NomenclatureJob object.
  • date_sent The date sent
  • database The name of the database
  • output_location - only used for assemblies at the moment
  • version The crobot pipeline version

assembly_lookup

For assemblies, information about how many jobs have been sent for a particular scheme, the id of the latest job, whether the job was complete or failed is stored in the assembly_lookup table.

  • assembly_id The id of the assembly

  • scheme_id The id of the scheme

  • version The latest version

  • st_barcode For nomenclature jobs this stores the Nserv st barcode for the assembly/scheme. For snp_calls it is the barcode of reference assembly used to call the snps. For other schemes it is blank or contains EGG ??

  • annototion For nomenclature schemes this contains the gff representation of the loci positions in the genome. This could be stored elsewhere as it takes up the most space in the database. It is only used to create Jbrowse annotations and to look up loci positions in the Locus Search and Accessory Genome pages. This is done by the get_allele_details_for_strain() method in entero.main.views (which is surprisingly fast as it has to retrieve the entire text and perform a linear search)

  • status the current status - FAILED, QUEUED or COMPLETE

  • other_data Older entries may contain other fields that were never used. The only value that is used (apart from results) is fail_count which is used by the [update_scheme](Maintenance Scripts#markdown-header-update_scheme) to decide whether to resend a job.
    • job_id the id of the job
    • job_count the number of times a job for this assembly/scheme combination has been sent
    • complete_count the number of times jobs have been processed. Not always the same as job_count if the same

    job has had multiple call backs * queued_count The number of queued jobs (something is wrong the job scheduling if this is greater than 1) * fail_count The number of jobs that have failed * results A dictionary of results

user_preferences For Analysis Jobs e.g MStrees, SNPS, information about the job is stored in the user_preferences entry, the data column.

  • For single jobs , if a job is complete it will have “complete”:”true” in the data of if it cannot be sent or returned as failed the value will be “failed”:”true”. Also if a job was successfully sent, job_id will be present in the data.
  • For an SNP Project, which requires multiple jobs, each has a tag with not_sent,queued or complete and another tag with the job’s id e.g. “ref_mapper_matrix”:”queued”,”ref_mapper_matrix_id”:2323232

Job Management

When crobot sends back the results of a job it calls /e

Assembly Jobs

The script [update_assemblies](Maintenance Scripts#markdown-header-update_assemblies) will attempt to find all strain records which do not have a valid assembly and have illimina paired end traces and assemble them.

  • Users can call assemblies by calling /api/v1.0/assemble the actual method is the put method of AssembleAPI in entero/api_1_0/jobs.py
  • When a user uploads files or they are loaded from a folder the file_upload_success method in entero/upload.views.py is called which calls assemble_user)uploads in entero.cell_app tasks this method adds the strain to the database based on metadata stored in user_uploads asd

Job Classes

https://bitbucket.org/repo/xzK7Xj/images/216576764-jobs.png

Jobs classes are in entero/jobs/jobs.py and are constructed by parameters which are particular to the type of Job or by the get_crobot_job method giving either the id of the job or the data returned from crobot

get_crobot_job(id=id,data=data)

LocalJob

In Local jobs send_job() simply calls process_job(), hence they are run instantly in the same thread. If USE CELERY in config is True or if the object’s use_celery property is True , the the job will be sent to the celery message queue. In practice this involves reconstructing the job object in the celery thread and calling process_job, hence all members must be able to be serialized.

RemoteJob

Remote job need to supply send_job() and process_job() methods. The __init__ method needs to account for the fact that the job can either be created with parameters ready to send the job or the returned data after the job has been processed

CrobotJob

All jobs (except the toy examples) inherit from CRobot job,which in turn inerits from CrobotJob. CrobotJob wprovides an init method, which will construct the object from data returned from crobot or the job_id. The job will then be ready to be processed will or resent. The job can also be constructed denovo with all the parameters, inputs and reads required by crobot. In practice, however, the init methods is overridden in sub-classes and simple parameters are converted to what is required by crobot. Two important properties are tag and type

  • tag This is used as handle to the object that the job has been run for. It is stored in the accession filed of the user_jobs table. For assembly based jobs it is the assembly barcode, for AssemblyJobs it is the trace accessions(s) and for analysis jobs, the id of the analysis object
  • type

AssmblyBasedJob

These jobs are for a ‘scheme’ on an individual assembly and should also inherit from a LocalJob or RemoteJob.Examples are NomenclatureJob, GerenicJob and CRSIPRJob. They are constructed with the SQLAlchemy scheme object,database, assembly_barcode and assembly_filepointer. user_id, workgroup and priority are optional

e.g

Schemes = dbhandle['senterica'].models.Schemes
scheme = dbhandle["senterica"].session.query(Schemes).filter_by(description='MLST_Achtman').one()
job = NomenclatureJob(scheme=scheme,
                   assembly_filepointer="/share_space/interact/outputs/2686/2686203/SAL_YA3618AA_AS.scaffold.fastq",
                   assembly_barcode="SAL_YA3618AA_AS",
                   database='senterica',
                   user_id=55)
job.send_job()

Celery

Running Celery

Celery is a message queue to allow methods to run asynchronously in a different thread. For it to work, rabbitmq has to be installed and running. In Enterobase, Celery is run with the following command in order to be able to access all of Enterobase’s resources. You can specify the number of threads (the default is 1 and at the time of writing, using multiple threads caused SQLAlchemy problems)

python manage.py runcelery -t 1

This has to be run as a deamon, if using supervisord, the following can be included in the supervisord.conf

[program:celery]
; path should be the python exe in the enterobase virtual environment
command=/var/www/venv/bin/python manage.py runcelery
;the directory is where manage.py is located in the local installation
directory=/var/www/entero/
;user must have correct permissions
user=enterobase
autorestart=true

From time to time rabbitmq will stop working. You can check is status with

service rabbitmq-server status

If it is down, you can restart it with

service rabbitmq-server start

Celery Setup In Enterobase

The way it is actually setup in Eneterobase is very simple - in entero __init__ is the following code:-

from celery import Celery
celery = Celery(__name__)
celery.conf.update(app.config)

The config for celery is in the main enterobase config and assumes that rabbitmq is running locally

BROKER_URL = 'amqp://guest:guest@localhost:5672//'
#CELERY_IMPORTS = ('entero.jobs.jobs', )
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['pickle']
USE_CELERY=False

To use celery just import entero.celery and wrap the method in celery.task.

from entero import celery
@celery.task
def my_method():
     #do stuff

N.B. If the module where the method resides is not imported when the app is loaded, it has to be imported in the runcelery method of manage.py in order for it to be registered.

When calling the a celery method is best to give an alternative if celery is not running e.g.

if app.config['USE_CELERY']:
      my_method.apply_async(args=[arg1,arg2],kwargs={"arg3":"value"},queue="entero")
else:
      my_method(arg1,arg2,arg3="value")