目录
01
—
背景介绍
02
—
使用介绍
(1)引入依赖模块
dependency>groupId>com.webank.wedatasphere.linkisgroupId>artifactId>linkis-ujes-jdbcartifactId>version>0.9.1version>dependency>
(2)建立测试类
public static void main(String[] args) throws SQLException, ClassNotFoundException {//1. 加载驱动类:Class.forName("com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriver");//2. 获得连接:jdbc:linkis://gatewayIP:gatewayPort 帐号和密码对应前端的帐号密码Connection connection = DriverManager.getConnection("jdbc:linkis://127.0.0.1:9001","username","password");//3. 创建statement 和执行查询Statement st= connection.createStatement();ResultSet rs=st.executeQuery("show tables");//4.处理数据库的返回结果(使用ResultSet类)while (rs.next()) {ResultSetMetaData metaData = rs.getMetaData();for (int i = 1; iSystem.out.print(metaData.getColumnName(i) + ":" +metaData.getColumnTypeName(i)+": "+ rs.getObject(i) + " ");}System.out.println();}//关闭资源rs.close();st.close();connection.close();}

图
2-1 Linkis JDBC
任务执行结果
03
—
模块设计方案
UJESSQLDriver
UJESSQLConnection
UJESSQLStatement
UJESSQLPreStatement
UJESSQLResultSet

static {try {DriverManager.registerDriver(new UJESSQLDriver());} catch (SQLException e) {Logger logger = LoggerFactory.getLogger(UJESSQLDriver.class);logger.info("Load driver failed",e);}}
(2)JDBC连接器UJESSQLConnection
conn = (UJESSQLConnection) DriverManager.getConnection("jdbc:linkis://hostname:port","username","password")
override def connect(url: String, info: Properties): Connection = if(acceptsURL(url)) {val props = if(info != null) info else newPropertiesprops.putAll(parseURL(url))val ujesClient =UJESClientFactory.getUJESClient(props)new UJESSQLConnection(ujesClient, props)} else throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url)private def parseURL(url: String): Properties= {val props = new Properties//add an entry to get urlprops.setProperty("URL", url)url match {case URL_REGEX(host, port, db, params)=>if(StringUtils.isNotBlank(host))props.setProperty(HOST, host)if(StringUtils.isNotBlank(port))props.setProperty(PORT, port.substring(1))if(StringUtils.isNotBlank(db)&& db.length > 1) props.setProperty(DB_NAME, db.substring(1))if(StringUtils.isNotBlank(params)&& params.length > 1) {val _params = params.substring(1)val kvs =_params.split(PARAM_SPLIT).map(_.split(KV_SPLIT)).filter {case Array(USER, value) =>props.setProperty(USER, value)falsecase Array(PASSWORD, value) =>props.setProperty(PASSWORD,value)falsecase Array(key, _) =>if(StringUtils.isBlank(key)) {throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url)} else truecase _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url)}props.setProperty(PARAMS,kvs.map(_.mkString(KV_SPLIT)).mkString(PARAM_SPLIT))}case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url)}props}
def getUJESClient(props: Properties):UJESClient = {val host = props.getProperty(HOST)val port = props.getProperty(PORT)val serverUrl = if(StringUtils.isNotBlank(port))s"http://$host:$port" else "http://" + hostif(ujesClients.containsKey(serverUrl))ujesClients.get(serverUrl)else serverUrl.intern synchronized {if(ujesClients.containsKey(serverUrl))return ujesClients.get(serverUrl)val ujesClient =createUJESClient(serverUrl, props)ujesClients.put(serverUrl, ujesClient)ujesClient}}
(3)执行对象UJESSQLStatement/UJESSQLPreStatement
//获取执行对象UJESSQLStatementstatement = (UJESSQLStatementCon) conn.createStatement;//获取预执行对象UJESSQLPrepareStatementpreStatement = (UJESSQLPrepareStatement) conn.prePareStatement;
override defexecute(sql: String): Boolean = throwWhenClosed {var parsedSQL = sql//预执行hook,转换不支持的sql语句JDBCDriverPreExecutionHook.getPreExecutionHooks.foreach{preExecution =>parsedSQL = preExecution.callPreExecutionHook(parsedSQL)}//获取linkis的job执行器,创建用于执行的action任务val action =JobExecuteAction.builder().setEngineType(EngineType.SPARK).addExecuteCode(parsedSQL).setCreator(ujesSQLConnection.creator).setUser(ujesSQLConnection.user)if(ujesSQLConnection.variableMap.nonEmpty)action.setVariableMap(JavaConversions.mapAsJavaMap(ujesSQLConnection.variableMap))//提交SQL任务到ujes客户端执行jobExecuteResult =ujesSQLConnection.ujesClient.execute(action.build())queryEnd = false//job状态检测var status =ujesSQLConnection.ujesClient.status(jobExecuteResult)val atMost = if(queryTimeout > 0) Duration(queryTimeout,TimeUnit.MILLISECONDS) else Duration.Infif(!status.isCompleted)Utils.tryThrow{Utils.waitUntil(() =>{status =ujesSQLConnection.ujesClient.status(jobExecuteResult)status.isCompleted ||closed}, atMost, 100, 10000)} {case t: TimeoutException=>if(queryTimeout >0) clearQuery()newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t)case t => t}if(!closed) {var jobInfo =ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult)if(status.isFailed)throw new ErrorException(jobInfo.getRequestPersistTask.getErrCode,jobInfo.getRequestPersistTask.getErrDesc)val jobInfoStatus =jobInfo.getJobStatusif(!jobInfoStatus.equals("Succeed")) Utils.tryThrow{Utils.waitUntil(()=> {jobInfo = ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult)val state =jobInfo.getJobStatus match{case"Failed" | "Cancelled" | "Timeout" |"Succeed" => truecase _ => false}state || closed}, atMost, 100, 10000)} {case t:TimeoutException =>if(queryTimeout >0) clearQuery()newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t)case t => t}//获取结果集val resultSetList =jobInfo.getResultSetList(ujesSQLConnection.ujesClient)queryEnd = trueif(resultSetList !=null) {resultSet = newUJESSQLResultSet(resultSetList, this, maxRows, fetchSize)true} else false} else throw newUJESSQLException(UJESSQLErrorCode.STATEMENT_CLOSED, "Statement isclosed.")}
private def init(): Unit = {resultSetResultInit()metaDataInit()resultSetInit()}private def resultSetResultInit(): Unit = {if (path == null) path =getResultSetPath(resultSetList)val user =connection.getProps.getProperty("user")if(StringUtils.isNotBlank(path)){val resultAction =ResultSetAction.builder.setUser(user).setPath(path).build()resultSetResult =connection.ujesClient.resultSet(resultAction)}}private def metaDataInit(): Unit = {if ( null ==resultSetResult ){return}metaData =resultSetResult.getMetadata.asInstanceOf[util.List[util.Map[String, String]]]for(cursor 1 tometaData.size()){val col =metaData.get(cursor - 1)resultSetMetaData.setColumnNameProperties(cursor,col.get("columnName"))resultSetMetaData.setDataTypeProperties(cursor,col.get("dataType"))resultSetMetaData.setCommentPropreties(cursor,col.get("comment"))}}private def resultSetInit(): Unit = {if ( null ==resultSetResult ){return}resultSetRow =resultSetResult.getFileContent.asInstanceOf[util.ArrayList[util.ArrayList[String]]]}
override def next(): Boolean = {if(metaData == null)init()currentRowCursor += 1if(null == resultSetRow ||currentRowCursor > resultSetRow.size()-1) falseelse{updateCurrentRow(currentRowCursor)true}}
override def getString(columnIndex: Int): String = {val any = getColumnValue(columnIndex)if(wasNull()) {throw newUJESSQLException(UJESSQLErrorCode.RESULTSET_ROWERROR, "Type is null")}else{any.asInstanceOf[String]}}
(5)错误码方案
public enum UJESSQLErrorCode {BAD_URL(80000,"badurl"),NOSUPPORT_DRIVER(80001,"this method not supported in driver"),NOSUPPORT_CONNECTION(80002,"this method not supported in connection"),NOSUPPORT_STATEMENT(80003,"this method not supported instatement"),CONNECTION_CLOSED(80004,"Connection is closed!"),STATEMENT_CLOSED(80005,"statement is closed!"),SCHEMA_EMPTY(80006,"schemais empty!"),SCHEMA_FAILED(80007,"Get schema failed!"),QUERY_TIMEOUT(80008,"query has been timeout!"),FILETYPE_ERROR(80009,"file type error"),METADATATYPE_ERROR(80010,"metadata type error"),NOSUPPORT_METADATA(80011, "thismethod not supported in DatabaseMetaData"),NOPERMITION(80012,"This user has no permission to read thisfile!"),PARAMS_NOT_FOUND(80013,"Parameter not found"),ERRORINFO_FROM_JOBINFO(80014,"get errorinfo from jobInfo"),RESULTSET_ROWERROR(80015,"rowmessage error"),NOSUPPORT_RESULTSET(80016,"this method not supported inresultSet"),RESULTSET_NULL(80017,"resultset is null,try to run next() firstlyto init ResultSet and MetaData"),PREPARESTATEMENT_TYPEERROR(80018,"parameter type error"),METADATA_EMPTY(80019,"data is empty");private String msg;private int code;UJESSQLErrorCode(intcode,String msg) {this.code = code;this.msg = msg;}public String getMsg() {return msg;}public int getCode() {return code;}}
04
—
实现方案总结

