#!/usr/bin/python import redis import MySQLdb global r_server def checkstatus( project,mode,queue_name ): global r_server "This Function check all the worker status" # ids_q_name = "ProjectQueue[%s][workers][%s][ids]" % (project,mode) print ids_q_name if (r_server.exists(ids_q_name)): # Get All the workers processing this project worker_ids = r_server.lrange(ids_q_name,0,-1) # Flag to check all the workers are updated update_mode = 1 if mode == "scraper": status = "scraped" elif mode == "processor": status = "processed" else: status = "finished" #Loop Through all server ids and get individual status for worker_id in worker_ids: thids_q_name = "ProjectQueue[%s][workers][%s][%s][threads]" % (project, mode, worker_id) if (r_server.exists(thids_q_name)): # Get All the scraping threads under this server_id thids = r_server.lrange(thids_q_name,0,-1) # Loop throughh all the threads and check individul status of scraping and processing for th_id in thids: status_q_name = "ProjectQueue[%s][workers][%s][%s][%s]" % (project, mode, worker_id, th_id) scrape_status = r_server.hget(status_q_name, status) queue_exists = r_server.exists(status_q_name) if scrape_status != '2' or not queue_exists: update_mode = 0 return 'false'; if update_mode: if status == 'finished': scraper_del_name = "ProjectQueue[%s][workers][scraper][ids]" % (project) processor_del_name = "ProjectQueue[%s][workers][processor][ids]" % (project) finisher_del_name = "ProjectQueue[%s][workers][finisher][ids]" % (project) print scraper_del_name,processor_del_name,finisher_del_name # Remove all the redis queues related to this project try: r_server.delete(scraper_del_name) except Exception, e: print e exit("holaaa") r_server.delete(scraper_del_name) r_server.delete(processor_del_name) r_server.delete(finisher_del_name) exit("HOLA") for worker_id in worker_ids: scrape_ths_del = "ProjectQueue[%s][workers][scraper][%s][threads]" % (project,worker_id) process_ths_del = "ProjectQueue[%s][workers][processor][%s][threads]" % (project,worker_id) finish_ths_del = "ProjectQueue[%s][workers][finisher][%s][threads]" % (project,worker_id) # Remove all the thread queues r_server.delete(scrape_ths_del) r_server.delete(process_ths_del) r_server.delete(finish_ths_del) for th_id in thids: scrape_th_del = "ProjectQueue[%s][workers][scraper][%s][%s]" % (project, worker_id, th_id) process_th_del = "ProjectQueue[%s][workers][processor][%s][%s]" % (project, worker_id, th_id) finish_th_del = "ProjectQueue[%s][workers][finisher][%s][%s]" % (project, worker_id, th_id) # Remove all the threadsids queue r_server.delete(scrape_th_del) r_server.delete(process_th_del) r_server.delete(finish_th_del) # Remove Project from the ProcessQueue r_server.lrem('ProcessQueue',project,0) # Prepare SQL query to UPDATE required records sql = "UPDATE projects SET scraped = 2, processed = 2 , finished = 2 WHERE id = %s" % (project) try: # Execute the SQL command cursor.execute(sql) # Commit your changes in the database db.commit() except: # Rollback in case there is any error db.rollback() return r_server.hset(queue_name,status,2) # Update Mysql entry return else: exit("HERE") # Open database connection db = MySQLdb.connect("167.114.96.213","auditor","yb9738z","auditor_distributed" ); # prepare a cursor object using cursor() method cursor = db.cursor() # Making Reddis Connection r_server = redis.Redis(host='localhost', port=9991, db=0, password='SAManagementServer2016', socket_timeout=None, connection_pool=None, charset='utf-8') modes = ['scraper','processor','finisher'] while (1): # Getting projets under process in queue projects = r_server.lrange('ProjectQueue',0,-1) # Looping through project list and checking all the status for project in projects: # main progress crawled value q_name = 'ProjectQueue[%s][progress]' % (project) c_status = r_server.hget(q_name,'crawled') if c_status != '2': continue; for mode in modes: status = checkstatus( project,mode,q_name) if (status == 'false'): exit("here")