JavaXT
|
|
QueryJob Classpackage javaxt.express.services; import javaxt.express.ServiceResponse; import javaxt.express.ServiceRequest; import javaxt.express.User; import java.sql.SQLException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.io.StringReader; import java.math.BigDecimal; import javaxt.sql.*; import javaxt.json.*; import static javaxt.utils.Console.console; import net.sf.jsqlparser.parser.*; import net.sf.jsqlparser.statement.select.*; import net.sf.jsqlparser.statement.Statement; import net.sf.jsqlparser.statement.Statements; import net.sf.jsqlparser.statement.create.table.CreateTable; import net.sf.jsqlparser.expression.LongValue; //****************************************************************************** //** QueryService //****************************************************************************** /** * Provides a set of web methods used to query the database. Loosely based * on the CartoDB SQL API: https://carto.com/developers/sql-api/reference/ * ******************************************************************************/ public class QueryService { private javaxt.io.Directory jobDir; private javaxt.io.Directory logDir; private Map<String, QueryJob> jobs = new ConcurrentHashMap<>(); private List<String> pendingJobs = new LinkedList<>(); private List<String> completedJobs = new LinkedList<>(); private java.util.List<SelectItem> selectCount; //************************************************************************** //** Constructor //************************************************************************** public QueryService(Database database, javaxt.io.Directory jobDir, javaxt.io.Directory logDir){ //Set path to the jobs directory if (jobDir!=null) if (!jobDir.exists()) jobDir.create(); if (jobDir==null || !jobDir.exists()){ throw new IllegalArgumentException("Invalid \"jobDir\""); } this.jobDir = jobDir; //Set path to the log directory if (logDir!=null) if (!logDir.exists()) logDir.create(); if (logDir!=null && logDir.exists()){ this.logDir = logDir; } //Delete any orphan sql jobs for (javaxt.io.Directory dir : jobDir.getSubDirectories()){ dir.delete(); } //Test whether JSqlParser is in the classpath and parse a default "select count(*)" statement try{ CCJSqlParserManager parserManager = new CCJSqlParserManager(); Select select = (Select) parserManager.parse(new StringReader("SELECT count(*) FROM T")); PlainSelect plainSelect = (PlainSelect) select.getSelectBody(); selectCount = plainSelect.getSelectItems(); } catch(Throwable t){ throw new IllegalArgumentException("Failed to instantiate JSqlParser"); } //Spawn threads used to execute queries int numThreads = 1; //TODO: Make configurable... for (int i=0; i<numThreads; i++){ new Thread(new QueryProcessor(database, this)).start(); } } //************************************************************************** //** getServiceResponse //************************************************************************** /** Used to generate a response to an HTTP request. The default routes are * as follows: * <ul> * <li>POST /job - Used to create a new query job and return a jobID</li> * <li>GET /job/{jobID} - Returns query results or job status for a given jobID</li> * <li>DELETE /job/{jobID} - Used to cancel query for a given jobID </li> * <li>GET /jobs - Returns a list of all query jobs associated with the user</li> * <li>GET /tables - Returns a list of all the tables in the database</li> * </ul> */ public ServiceResponse getServiceResponse(ServiceRequest request, Database database) { String path = request.getPath(0).toString(); if (path!=null){ if (path.equals("jobs")){ return list(request); } else if (path.equals("job")){ String method = request.getRequest().getMethod(); if (method.equals("GET")){ return getJob(request); } else if (method.equals("POST")){ return query(request, true); } else if (method.equals("DELETE")){ return cancel(request, database); } else{ return new ServiceResponse(501, "Not implemented"); } } else if (path.equals("tables")){ return getTables(request, database); } else{ return new ServiceResponse(501, "Not implemented"); } } else{ return query(request, false); } } //************************************************************************** //** notify //************************************************************************** public void notify(QueryJob job){} //************************************************************************** //** query //************************************************************************** private ServiceResponse query(ServiceRequest request, boolean async) { try{ //Get query String query = getParameter("q", request).toString(); if (query==null) query = getParameter("query", request).toString(); if (query==null) throw new IllegalArgumentException("Query is required"); //Get Offset and Limit Long offset = getParameter("offset", request).toLong(); Long limit = getParameter("limit", request).toLong(); if (limit==null) limit = 25L; if (offset==null){ Long page = getParameter("page", request).toLong(); if (page!=null) offset = (page*limit)-limit; } //Parse sql statement using JSQLParser Select select = null; CreateTable createTempTable = null; Statements statements = CCJSqlParserUtil.parseStatements(query); if (statements!=null){ Iterator<Statement> it = statements.getStatements().iterator(); while (it.hasNext()){ Statement statement = it.next(); if (statement instanceof CreateTable){ CreateTable createTable = (CreateTable) statement; boolean isTemporaryTable = false; Iterator<String> i2 = createTable.getCreateOptionsStrings().iterator(); while (i2.hasNext()){ String option = i2.next(); if (option.equalsIgnoreCase("TEMPORARY")) isTemporaryTable = true; break; } if (isTemporaryTable){ if (select!=null) throw new IllegalArgumentException("Temporary table must be created before the SELECT statement"); if (createTempTable!=null) throw new IllegalArgumentException("Only 1 temp table allowed"); createTempTable = createTable; } else{ throw new IllegalArgumentException("CREATE TABLE statements not allowed"); } } else if (statement instanceof Select){ if (select!=null) throw new IllegalArgumentException("Only 1 SELECT statement allowed"); select = (Select) statement; } else{ throw new IllegalArgumentException(statement.getClass().getSimpleName() + " statements not allowed"); } } } //Check whether the select statement has illegal or unsupported functions checkSelect((PlainSelect) select.getSelectBody()); //Collect misc params JSONObject params = new JSONObject(); params.set("format", getParameter("format", request).toString()); Boolean addMetadata = getParameter("metadata", request).toBoolean(); if (addMetadata!=null && addMetadata==true){ params.set("metadata", true); } Boolean count = getParameter("count", request).toBoolean(); if (count!=null && count==true){ params.set("count", true); } //Create job User user = (User) request.getUser(); QueryJob job = new QueryJob(user.getID(), select, offset, limit, params); if (createTempTable!=null) job.addTempTable(createTempTable); String key = job.getKey(); job.log(); notify(job); //Update list of jobs synchronized(jobs) { jobs.put(key, job); jobs.notify(); } //Update pendingJobs synchronized(pendingJobs) { pendingJobs.add(key); pendingJobs.notify(); } //Generate response if (async){ return new ServiceResponse(job.toJson()); } else{ synchronized (completedJobs) { while (!completedJobs.contains(key)) { try { completedJobs.wait(); } catch (InterruptedException e) { break; } } } return getJobResponse(job); } } catch(Exception e){ if (e instanceof net.sf.jsqlparser.JSQLParserException){ e = new Exception("Unsupported or Invalid SQL Statement"); } return new ServiceResponse(e); } } //************************************************************************** //** checkSelect //************************************************************************** /** Used to check whether the select statement has illegal or unsupported * functions */ protected void checkSelect(PlainSelect plainSelect){} //************************************************************************** //** Writer //************************************************************************** /** Used to generate json, csv, tsv, etc using records from the database */ private class Writer { private String format; private StringBuilder str; private long x = 0; private Long elapsedTime; private Long count; private JSONArray metadata; private boolean addMetadata = false; private boolean isClosed = false; public Writer(String format, boolean addMetadata){ str = new StringBuilder(); this.format = format; this.addMetadata = addMetadata; if (format.equals("json")){ str.append("{\"rows\":["); } } public void write(Recordset rs){ if (isClosed) return; //throw exception? Field[] fields = rs.getFields(); if (x==0){ metadata = new JSONArray(); int count = 1; for (Field field : fields) { JSONObject json = new JSONObject(); json.set("id", count); json.set("name", field.getName()); json.set("type", field.getType()); json.set("class", field.getClassName()); json.set("table", field.getTableName()); metadata.add(json); count++; } if (format.equals("tsv") || format.equals("csv")){ String s = format.equals("tsv") ? "\t" : ","; for (int i=0; i<fields.length; i++){ if (i>0) str.append(s); str.append(fields[i].getName()); } str.append("\r\n"); } } if (format.equals("json")){ JSONObject json = new JSONObject(); for (Field field : fields){ Object val = field.getValue().toObject(); if (val==null){ val = "null"; } else{ if (val instanceof String){ String s = (String) val; if (s.trim().length()==0) val = "null"; } } json.set(field.getName(), val); } if (x>0) str.append(","); str.append(json.toString().replace("\"null\"", "null")); //<-- this is a bit of a hack... } else if (format.equals("tsv") || format.equals("csv")){ String s = format.equals("tsv") ? "\t" : ","; for (int i=0; i<fields.length; i++){ if (i>0) str.append(s); Object value = fields[i].getValue().toObject(); if (value==null){ value = ""; } else{ if (value instanceof String){ String v = (String) value; if (v.contains(s)){ value = "\"" + v + "\""; } } else if (value instanceof javaxt.utils.Date) { value = ((javaxt.utils.Date) value).toISOString(); } else if (value instanceof java.util.Date) { value = new javaxt.utils.Date(((java.util.Date) value)).toISOString(); } else if (value instanceof java.util.Calendar) { value = new javaxt.utils.Date(((java.util.Calendar) value)).toISOString(); } } str.append(value); } str.append("\r\n"); } x++; } public void includeMetadata(boolean b){ addMetadata = b; } public void setElapsedTime(long elapsedTime){ this.elapsedTime = elapsedTime; } public void setCount(long count){ this.count = count; } public void close(){ isClosed = true; if (format.equals("json")){ str.append("]"); if (addMetadata){ if (metadata!=null){ str.append(",\"metadata\":"); str.append(metadata); } } if (count!=null){ str.append(",\"total_rows\":"); str.append(count); } if (this.elapsedTime!=null){ double elapsedTime = (double)(this.elapsedTime)/1000d; BigDecimal time = new BigDecimal(elapsedTime).setScale(3, BigDecimal.ROUND_HALF_UP); str.append(",\"time\":"); str.append(time); } str.append("}"); } } public String toString(){ if (!isClosed) close(); return str.toString(); } } //************************************************************************** //** list //************************************************************************** /** Returns an unordered list of jobs */ private ServiceResponse list(ServiceRequest request) { User user = (User) request.getUser(); JSONArray arr = new JSONArray(); synchronized (jobs) { Iterator<String> it = jobs.keySet().iterator(); while (it.hasNext()){ String key = it.next(); QueryJob job = jobs.get(key); if (job.userID==user.getID()){ arr.add(job.toJson()); } } } return new ServiceResponse(arr); } //************************************************************************** //** getJob //************************************************************************** /** Used to return the status or results for a given jobID. Example: * [GET] sql/job/{jobID} */ private ServiceResponse getJob(ServiceRequest request) { String id = request.getPath(1).toString(); User user = (User) request.getUser(); QueryJob job = getJob(id, user); if (job==null) return new ServiceResponse(404); return getJobResponse(job); } //************************************************************************** //** getJob //************************************************************************** /** Returns a job for a given jobID and user. Checks both the pending and * completed job queues. */ private QueryJob getJob(String jobID, User user){ synchronized (jobs) { return jobs.get(user.getID() + ":" + jobID); } } //************************************************************************** //** getJobResponse //************************************************************************** /** Used to generate a ServiceResponse for a given job. If a job has failed * or is complete, returns the output of the job. If the job is pending or * running, simply returns the job status. */ private ServiceResponse getJobResponse(QueryJob job){ ServiceResponse response; if (job.status.equals("failed")){ javaxt.io.File file = job.getOutput(); String str = file.getText(); response = new ServiceResponse(500, str); deleteJob(job); } else if (job.status.equals("complete")){ javaxt.io.File file = job.getOutput(); String str = file.getText(); response = new ServiceResponse(str); response.setContentType(file.getContentType()); deleteJob(job); } else{ response = new ServiceResponse(job.status); } return response; } //************************************************************************** //** deleteJob //************************************************************************** /** Removes a job from the queue and deletes any output files that might * have been created with the job. */ private void deleteJob(QueryJob job){ String key = job.getKey(); synchronized(pendingJobs){ pendingJobs.remove(key); pendingJobs.notify(); } synchronized (completedJobs) { completedJobs.remove(key); completedJobs.notify(); } synchronized (jobs) { jobs.remove(key); jobs.notify(); } javaxt.io.File file = job.getOutput(); file.delete(); } //************************************************************************** //** cancel //************************************************************************** /** Used to cancel a pending or running job. */ private ServiceResponse cancel(ServiceRequest request, Database database) { String id = request.getPath(1).toString(); User user = (User) request.getUser(); QueryJob job = getJob(id, user); if (job==null) return new ServiceResponse(404); String key = job.getKey(); synchronized(pendingJobs){ pendingJobs.remove(key); pendingJobs.notify(); } try (Connection conn = database.getConnection()) { //Update job status job.status = "canceled"; job.updated = new javaxt.utils.Date(); notify(job); //Cancel the query in the database Integer pid = getPid(job.getKey(), conn); if (pid!=null){ boolean jobCanceled = false; javaxt.sql.Record record = conn.getRecord("SELECT pg_cancel_backend(" + pid + ")"); if (record!=null) jobCanceled = record.get(0).toBoolean(); if (!jobCanceled){ record = conn.getRecord("SELECT pg_terminate_backend(" + pid + ")"); if (record!=null) jobCanceled = record.get(0).toBoolean(); } if (!jobCanceled){ throw new Exception(); } } //Update queue deleteJob(job); //return response return new ServiceResponse(job.toJson()); } catch(Exception e){ return new ServiceResponse(500, "failed to cancel query"); } } //************************************************************************** //** getPid //************************************************************************** /** Returns process id for a given jobId */ private Integer getPid(String key, Connection conn) throws SQLException { javaxt.sql.Record record = conn.getRecord( "SELECT pid from pg_stat_activity where query like '--" + key + "%'"); return record==null ? null : record.get(0).toInteger(); } //************************************************************************** //** getTables //************************************************************************** /** Returns a list of tables and columns */ public ServiceResponse getTables(ServiceRequest request, Database database) { try { JSONArray arr = new JSONArray(); for (Table table : database.getTables()){ JSONArray columns = new JSONArray(); for (Column column : table.getColumns()){ JSONObject col = new JSONObject(); col.set("name", column.getName()); col.set("type", column.getType()); if (column.isPrimaryKey()){ col.set("primaryKey", true); } columns.add(col); } JSONObject json = new JSONObject(); json.set("name", table.getName()); json.set("schema", table.getSchema()); json.set("columns", columns); arr.add(json); } JSONObject json = new JSONObject(); json.set("tables", arr); return new ServiceResponse(json); } catch(Exception e){ return new ServiceResponse(e); } } //************************************************************************** //** getParameter //************************************************************************** /** Used to extract a parameter either from the URL query string or the json * in the request payload. */ private javaxt.utils.Value getParameter(String name, ServiceRequest request){ if (request.getRequest().getMethod().equals("GET")){ return request.getParameter(name); } else{ JSONObject json = request.getJson(); if (json.has(name)){ return new javaxt.utils.Value(json.get(name).toObject()); } else{ return request.getParameter(name); } } } //************************************************************************** //** QueryJob //************************************************************************** public class QueryJob { private String id; private long userID; private Select select; private Long offset; private LongValue limit; private javaxt.utils.Date created; private javaxt.utils.Date updated; private String status; private String format; private boolean countTotal = false; private boolean addMetadata = false; private CreateTable tempTable; public QueryJob(long userID, Select select, Long offset, Long limit, JSONObject params) { this.id = UUID.randomUUID().toString(); this.userID = userID; this.select = select; this.offset = offset; this.limit = limit==null ? null : new LongValue(limit); this.created = new javaxt.utils.Date(); this.updated = this.created.clone(); this.status = "pending"; String format = params.get("format").toString(); if (format==null) format=""; format = format.trim().toLowerCase(); if (format.equals("csv") || format.equals("tsv")){ this.format = format; } else this.format = "json"; if (params.has("count")){ countTotal = params.get("count").toBoolean(); } if (params.has("metadata")){ addMetadata = params.get("metadata").toBoolean(); } } public String getID(){ return id; } public long getUserID(){ return userID; } public String getStatus(){ return status; } public void addTempTable(CreateTable stmt){ tempTable = stmt; } public CreateTable getTempTable(){ return tempTable; } public String getKey(){ return userID + ":" + id; } public boolean isCanceled(){ return status.equals("canceled"); } public String getQuery(){ PlainSelect plainSelect = (PlainSelect) select.getSelectBody(); //Update offset and limit if (offset!=null){ Offset o = plainSelect.getOffset(); if (o==null) o = new Offset(); o.setOffset(offset); plainSelect.setOffset(o); } if (limit!=null){ Limit l = plainSelect.getLimit(); if (l==null){ l = new Limit(); l.setRowCount(limit); plainSelect.setLimit(l); } } String query = plainSelect.toString(); //Prepend any "with" clause that might be present if (select.getWithItemsList()!=null){ java.util.Iterator<WithItem> i2 = select.getWithItemsList().iterator(); query = "with " + i2.next() + " \r\n" + query; } return query; } public String getCountQuery(){ PlainSelect plainSelect = (PlainSelect) select.getSelectBody(); plainSelect.setSelectItems(selectCount); String query = plainSelect.toString(); //Add prepend any "with" clause that might be present if (!select.getWithItemsList().isEmpty()){ java.util.Iterator<WithItem> i2 = select.getWithItemsList().iterator(); query = "with " + i2.next() + " \r\n" + query; } return query; } public boolean countTotal(){ if (countTotal){ if (format.equals("json")) return true; } return false; } public boolean addMetadata(){ return addMetadata; } public String getOutputFormat(){ return format; } public javaxt.io.File getOutput(){ return new javaxt.io.File(jobDir.toString() + userID + "/" + id + "." + format); } public String getContentType(){ if (format.equals("tsv")){ return "text/plain"; } else if (format.equals("csv")) return "text/csv"; else{ return "application/json"; } } public void log(){ if (logDir!=null){ javaxt.io.File file = new javaxt.io.File(logDir.toString() + userID + "/" + id + ".json"); file.write(toJson().toString()); } } public JSONObject toJson() { JSONObject json = new JSONObject(); json.set("user_id", userID); json.set("job_id", id); json.set("status", status); json.set("query", getQuery()); json.set("created_at", created); json.set("updated_at", updated); return json; } } //************************************************************************** //** QueryProcessor //************************************************************************** /** Thread used to execute queries */ private class QueryProcessor implements Runnable { private Database database; private QueryService queryService; public QueryProcessor(Database database, QueryService queryService){ this.database = database; this.queryService = queryService; } public void run() { while (true) { Object obj = null; synchronized (pendingJobs) { while (pendingJobs.isEmpty()) { try { pendingJobs.wait(); } catch (InterruptedException e) { return; } } obj = pendingJobs.get(0); if (obj!=null) pendingJobs.remove(0); pendingJobs.notifyAll(); } if (obj!=null){ //Find query job String key = (String) obj; QueryJob job = null; synchronized (jobs) { job = jobs.get(key); } if (job!=null && !job.isCanceled()){ Connection conn = null; try{ //Update job status and set start time job.status = "running"; job.updated = new javaxt.utils.Date(); long startTime = System.currentTimeMillis(); queryService.notify(job); //Open database connection conn = database.getConnection(); //Create temp table as needed CreateTable createTempTable = job.getTempTable(); if (createTempTable!=null){ conn.execute("--" + job.getKey() + "\n" + createTempTable.toString()); if (job.isCanceled()){ conn.execute("DROP TABLE " + createTempTable.getTable().getName()); throw new Exception(); } } //Execute query and generate response String query = job.getQuery(); Writer writer = new Writer(job.getOutputFormat(), job.addMetadata()); Recordset rs = conn.getRecordset("--" + job.getKey() + "\n" + query); while (rs.next()){ writer.write(rs); } rs.close(); if (job.isCanceled()) throw new Exception(); //Count total records as needed if (job.countTotal()){ javaxt.sql.Record record = conn.getRecord(job.getCountQuery()); if (record!=null){ Long ttl = record.get(0).toLong(); if (ttl!=null){ writer.setCount(ttl); } } } if (job.isCanceled()) throw new Exception(); //Drop temp table if (createTempTable!=null){ conn.execute("DROP TABLE " + createTempTable.getTable().getName()); } if (job.isCanceled()) throw new Exception(); //Close database connection conn.close(); //Set elapsed time writer.setElapsedTime(System.currentTimeMillis()-startTime); //Write output to a file javaxt.io.File file = job.getOutput(); file.write(writer.toString()); //Update job status job.status = "complete"; job.updated = new javaxt.utils.Date(); queryService.notify(job); } catch(Exception e){ if (conn!=null) conn.close(); javaxt.io.File file = job.getOutput(); if (job.isCanceled()){ file.delete(); } else{ job.status = "failed"; job.updated = new javaxt.utils.Date(); queryService.notify(job); java.io.PrintStream ps = null; try { file.create(); ps = new java.io.PrintStream(file.toFile()); e.printStackTrace(ps); ps.close(); } catch (Exception ex) { if (ps!=null) ps.close(); file.write(e.getMessage()); } } } //Add job to the completedJobs if (!job.isCanceled()){ synchronized(completedJobs){ completedJobs.add(job.getKey()); completedJobs.notify(); } } } } else{ return; } } } } } |