`
wu_quanyin
  • 浏览: 204442 次
  • 性别: Icon_minigender_1
  • 来自: 福建省
社区版块
存档分类
最新评论

Mysql JDBC驱动源码分析(Statement,ResultSet的创建)四

 
阅读更多

一,当连接创建完成时,接着就创建Statement进行sql查询,并返回相应的ResultSet

进入ConnectionImpl类下的,createStatement(..)

 

 

	
public java.sql.Statement createStatement(int resultSetType,
			int resultSetConcurrency, int resultSetHoldability)
			throws SQLException {
               //对返回的结果集进行指定相应的模式功能,可参照ResultSet的常量设置
		if (getPedantic()) {
			if (resultSetHoldability != java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT) {
				throw SQLError.createSQLException(
						"HOLD_CUSRORS_OVER_COMMIT is only supported holdability level",
						SQLError.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor());
			}
		}

		return createStatement(resultSetType, resultSetConcurrency);
	}





public java.sql.Statement createStatement(int resultSetType,
			int resultSetConcurrency) throws SQLException {
		checkClosed();
                //getLoadBalanceSafeProxy() 为相应的连接
		StatementImpl stmt = new 
StatementImpl(getLoadBalanceSafeProxy(), this.database);
		stmt.setResultSetType(resultSetType);
		stmt.setResultSetConcurrency(resultSetConcurrency);

		return stmt;
	}
 

进入StatementImpl .executeQuery(..)

 

public synchronized java.sql.ResultSet executeQuery(String sql)
			throws SQLException {
		checkClosed();
		//参数所传的连接对象
		MySQLConnection locallyScopedConn = this.connection;

		synchronized (locallyScopedConn) {
			this.retrieveGeneratedKeys = false;
			
			resetCancelledState();

			checkNullOrEmptyQuery(sql);

			//We only stream result sets when they are forward-only, read-only, and thefetch size has been set to Integer.MIN_VALUE
			boolean doStreaming = createStreamingResultSet();


			if (doStreaming
					&& this.connection.getNetTimeoutForStreamingResults() > 0) {
				executeSimpleNonQuery(locallyScopedConn, "SET net_write_timeout="
						+ this.connection.getNetTimeoutForStreamingResults());
			}

			if (this.doEscapeProcessing) {
				Object escapedSqlResult = EscapeProcessor.escapeSQL(sql,
						locallyScopedConn.serverSupportsConvertFn(), this.connection);

				if (escapedSqlResult instanceof String) {
					sql = (String) escapedSqlResult;
				} else {
					sql = ((EscapeProcessorResult) escapedSqlResult).escapedSql;
				}
			}

			char firstStatementChar = StringUtils.firstNonWsCharUc(sql,
					findStartOfStatement(sql));

			if (sql.charAt(0) == '/') {
				if (sql.startsWith(PING_MARKER)) {
					doPingInstead();
				
					return this.results;
				}
			}
			
			checkForDml(sql, firstStatementChar);

			if (this.results != null) {
				if (!locallyScopedConn.getHoldResultsOpenOverStatementClose()) {
					this.results.realClose(false);
				}
			}

			CachedResultSetMetaData cachedMetaData = null;

			// If there isn't a limit clause in the SQL
			// then limit the number of rows to return in
			// an efficient manner. Only do this if
			// setMaxRows() hasn't been used on any Statements
			// generated from the current Connection (saves
			// a query, and network traffic).

			if (useServerFetch()) {
				this.results = createResultSetUsingServerFetch(sql);

				return this.results;
			}

			CancelTask timeoutTask = null;

			String oldCatalog = null;

			try {
				if (locallyScopedConn.getEnableQueryTimeouts() &&
						this.timeoutInMillis != 0
						&& locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {
					timeoutTask = new CancelTask(this);
					locallyScopedConn.getCancelTimer().schedule(timeoutTask,
							this.timeoutInMillis);
				}

				if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {
					oldCatalog = locallyScopedConn.getCatalog();
					locallyScopedConn.setCatalog(this.currentCatalog);
				}

				//
				// Check if we have cached metadata for this query...
				//

				Field[] cachedFields = null;

				//是否应用缓存ResultSet
				if (locallyScopedConn.getCacheResultSetMetadata()) {
					cachedMetaData = locallyScopedConn.getCachedMetaData(sql);

					if (cachedMetaData != null) {
						cachedFields = cachedMetaData.fields;
					}
				}

				if (locallyScopedConn.useMaxRows()) {
					// We need to execute this all together
					// So synchronize on the Connection's mutex (because
					// even queries going through there synchronize
					// on the connection
					if (StringUtils.indexOfIgnoreCase(sql, "LIMIT") != -1) { //$NON-NLS-1$
						this.results = locallyScopedConn.execSQL(this, sql,
								this.maxRows, null, this.resultSetType,
								this.resultSetConcurrency,
								doStreaming,
								this.currentCatalog, cachedFields);
					} else {
						if (this.maxRows <= 0) {
							executeSimpleNonQuery(locallyScopedConn,
									"SET OPTION SQL_SELECT_LIMIT=DEFAULT");
						} else {
							executeSimpleNonQuery(locallyScopedConn,
											"SET OPTION SQL_SELECT_LIMIT=" + this.maxRows);
						}

						statementBegins();
						
						this.results = locallyScopedConn.execSQL(this, sql, -1,
								null, this.resultSetType,
								this.resultSetConcurrency,
								doStreaming,
								this.currentCatalog, cachedFields);

						if (oldCatalog != null) {
							locallyScopedConn.setCatalog(oldCatalog);
						}
					}
				} else {
					statementBegins();
					//ConnectionImpl中执行sql语句
					this.results = locallyScopedConn.execSQL(this, sql, -1, null,
							this.resultSetType, this.resultSetConcurrency,
							doStreaming,
							this.currentCatalog, cachedFields);
				}

				if (timeoutTask != null) {
					if (timeoutTask.caughtWhileCancelling != null) {
						throw timeoutTask.caughtWhileCancelling;
					}

					timeoutTask.cancel();
					
					locallyScopedConn.getCancelTimer().purge();
					
					timeoutTask = null;
				}

				synchronized (this.cancelTimeoutMutex) {
					if (this.wasCancelled) {
						SQLException cause = null;
						
						if (this.wasCancelledByTimeout) {
							cause = new MySQLTimeoutException();
						} else {
							cause = new MySQLStatementCancelledException();
						}
						
						resetCancelledState();
						
						throw cause;
					}
				}
			} finally {
				this.statementExecuting.set(false);
				
				if (timeoutTask != null) {
					timeoutTask.cancel();
					
					locallyScopedConn.getCancelTimer().purge();
				}

				if (oldCatalog != null) {
					locallyScopedConn.setCatalog(oldCatalog);
				}
			}

			this.lastInsertId = this.results.getUpdateID();

			if (cachedMetaData != null) {
				locallyScopedConn.initializeResultsMetadataFromCache(sql, cachedMetaData,
						this.results);
			} else {
				if (this.connection.getCacheResultSetMetadata()) {
					locallyScopedConn.initializeResultsMetadataFromCache(sql,
							null /* will be created */, this.results);
				}
			}

			return this.results;
		}
	}

  进入ConnectionImpl.execSQl(..)

 

public synchronized ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows,
			Buffer packet, int resultSetType, int resultSetConcurrency,
			boolean streamResults, String catalog,
			Field[] cachedMetadata,
			boolean isBatch) throws SQLException {

		long queryStartTime = 0;

		int endOfQueryPacketPosition = 0;

		if (packet != null) {
			endOfQueryPacketPosition = packet.getPosition();
		}

		if (getGatherPerformanceMetrics()) {
			queryStartTime = System.currentTimeMillis();
		}

		this.lastQueryFinishedTime = 0; // we're busy!

		if ((getHighAvailability())
				&& (this.autoCommit || getAutoReconnectForPools())
				&& this.needsPing && !isBatch) {
			try {
				pingInternal(false, 0);

				this.needsPing = false;
			} catch (Exception Ex) {
				createNewIO(true);
			}
		}

		try {
			if (packet == null) {
				String encoding = null;

				if (getUseUnicode()) {
					encoding = getEncoding();
				}
				//进入MysqlIO中执行查询操作
				return this.io.sqlQueryDirect(callingStatement, sql,
						encoding, null, maxRows, resultSetType,
						resultSetConcurrency, streamResults, catalog,
						cachedMetadata);
			}

			return this.io.sqlQueryDirect(callingStatement, null, null,
					packet, maxRows, resultSetType,
					resultSetConcurrency, streamResults, catalog,
					cachedMetadata);
		} catch (java.sql.SQLException sqlE) {
			// don't clobber SQL exceptions

			if (getDumpQueriesOnException()) {
				String extractedSql = extractSqlFromPacket(sql, packet,
						endOfQueryPacketPosition);
				StringBuffer messageBuf = new StringBuffer(extractedSql
						.length() + 32);
				messageBuf
						.append("\n\nQuery being executed when exception was thrown:\n");
				messageBuf.append(extractedSql);
				messageBuf.append("\n\n");

				sqlE = appendMessageToException(sqlE, messageBuf.toString(), getExceptionInterceptor());
			}

			if ((getHighAvailability())) {
				this.needsPing = true;
			} else {
				String sqlState = sqlE.getSQLState();

				if ((sqlState != null)
						&& sqlState
								.equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) {
					cleanup(sqlE);
				}
			}

			throw sqlE;
		} catch (Exception ex) {
			if (getHighAvailability()) {
				this.needsPing = true;
			} else if (ex instanceof IOException) {
				cleanup(ex);
			}

			SQLException sqlEx = SQLError.createSQLException(
					Messages.getString("Connection.UnexpectedException"),
					SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor());
			sqlEx.initCause(ex);
			
			throw sqlEx;
		} finally {
			if (getMaintainTimeStats()) {
				this.lastQueryFinishedTime = System.currentTimeMillis();
			}


			if (getGatherPerformanceMetrics()) {
				long queryTime = System.currentTimeMillis()
						- queryStartTime;

				registerQueryExecutionTime(queryTime);
			}
		}
	}
 

 

进入MysqlIO.sqlQueryDirect(..)中执行生成ResultSet

 

  final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query,
    		String characterEncoding, Buffer queryPacket, int maxRows,
    		int resultSetType, int resultSetConcurrency,
    		boolean streamResults, String catalog, Field[] cachedMetadata)
    throws Exception {
    	this.statementExecutionDepth++;

    	try {
	    	if (this.statementInterceptors != null) {
	    		ResultSetInternalMethods interceptedResults =
	    			invokeStatementInterceptorsPre(query, callingStatement, false);

	    		if (interceptedResults != null) {
	    			return interceptedResults;
	    		}
	    	}

	    	long queryStartTime = 0;
	    	long queryEndTime = 0;

    		String statementComment = this.connection.getStatementComment();
    		
    		if (this.connection.getIncludeThreadNamesAsStatementComment()) {
    			statementComment = (statementComment != null ? statementComment + ", " : "") + "java thread: " + Thread.currentThread().getName();
    		}
    		
	    	if (query != null) {
	    		// We don't know exactly how many bytes we're going to get
	    		// from the query. Since we're dealing with Unicode, the
	    		// max is 2, so pad it (2 * query) + space for headers
	    		int packLength = HEADER_LENGTH + 1 + (query.length() * 2) + 2;

	    		byte[] commentAsBytes = null;

	    		if (statementComment != null) {
	    			commentAsBytes = StringUtils.getBytes(statementComment, null,
	    					characterEncoding, this.connection
	    					.getServerCharacterEncoding(),
	    					this.connection.parserKnowsUnicode(), getExceptionInterceptor());

	    			packLength += commentAsBytes.length;
	    			packLength += 6; // for /*[space] [space]*/
	    		}
			//sendPacket封装数据的包,发送至服务器
	    		if (this.sendPacket == null) {
	    			this.sendPacket = new Buffer(packLength);
	    		} else {
	    			this.sendPacket.clear();
	    		}

	    		this.sendPacket.writeByte((byte) MysqlDefs.QUERY);

	    		if (commentAsBytes != null) {
	    			this.sendPacket.writeBytesNoNull(Constants.SLASH_STAR_SPACE_AS_BYTES);
	    			this.sendPacket.writeBytesNoNull(commentAsBytes);
	    			this.sendPacket.writeBytesNoNull(Constants.SPACE_STAR_SLASH_SPACE_AS_BYTES);
	    		}

	    		if (characterEncoding != null) {
	    			if (this.platformDbCharsetMatches) {
	    				this.sendPacket.writeStringNoNull(query, characterEncoding,
	    						this.connection.getServerCharacterEncoding(),
	    						this.connection.parserKnowsUnicode(),
	    						this.connection);
	    			} else {
	    				if (StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) { //$NON-NLS-1$
	    					this.sendPacket.writeBytesNoNull(StringUtils.getBytes(query));
	    				} else {
	    					this.sendPacket.writeStringNoNull(query,
	    							characterEncoding,
	    							this.connection.getServerCharacterEncoding(),
	    							this.connection.parserKnowsUnicode(),
	    							this.connection);
	    				}
	    			}
	    		} else {
	    			this.sendPacket.writeStringNoNull(query);
	    		}
                         //赋值
	    		queryPacket = this.sendPacket;
	    	}

	    	byte[] queryBuf = null;
	    	int oldPacketPosition = 0;

	    	if (needToGrabQueryFromPacket) {
	    		queryBuf = queryPacket.getByteBuffer();

	    		// save the packet position
	    		oldPacketPosition = queryPacket.getPosition();

	    		queryStartTime = getCurrentTimeNanosOrMillis();
	    	}
	    	
	    	if (this.autoGenerateTestcaseScript) {
	    		String testcaseQuery = null;

	    		if (query != null) {
	    			if (statementComment != null) {
	    				testcaseQuery = "/* " + statementComment + " */ " + query;
	    			} else {
	    				testcaseQuery = query;
	    			}
	    		} else {
	    			testcaseQuery = StringUtils.toString(queryBuf, 5,
	    					(oldPacketPosition - 5));
	    		}

	    		StringBuffer debugBuf = new StringBuffer(testcaseQuery.length() + 32);
	    		this.connection.generateConnectionCommentBlock(debugBuf);
	    		debugBuf.append(testcaseQuery);
	    		debugBuf.append(';');
	    		this.connection.dumpTestcaseQuery(debugBuf.toString());
	    	}

	    	// Send query command and sql query string
                //发送查询命与sql查询语句,并得到查询结果(socket处理)
	    	Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket,
	    			false, null, 0);

	    	long fetchBeginTime = 0;
	    	long fetchEndTime = 0;

	    	String profileQueryToLog = null;

	    	boolean queryWasSlow = false;

	    	if (this.profileSql || this.logSlowQueries) {
	    		queryEndTime = System.currentTimeMillis();

	    		boolean shouldExtractQuery = false;

	    		if (this.profileSql) {
	    			shouldExtractQuery = true;
	    		} else if (this.logSlowQueries) {
	    			long queryTime = queryEndTime - queryStartTime;
	    			
	    			boolean logSlow = false;
	    			
	    			if (this.useAutoSlowLog) {
	    				logSlow = queryTime > this.connection.getSlowQueryThresholdMillis();
	    			} else {
	    				logSlow = this.connection.isAbonormallyLongQuery(queryTime);
	    				
	    				this.connection.reportQueryTime(queryTime);
	    			}
	    			
	    			if (logSlow) {
	    				shouldExtractQuery = true;
	    				queryWasSlow = true;
	    			}
	    		}

	    		if (shouldExtractQuery) {
	    			// Extract the actual query from the network packet
	    			boolean truncated = false;

	    			int extractPosition = oldPacketPosition;

	    			if (oldPacketPosition > this.connection.getMaxQuerySizeToLog()) {
	    				extractPosition = this.connection.getMaxQuerySizeToLog() + 5;
	    				truncated = true;
	    			}

	    			profileQueryToLog = StringUtils.toString(queryBuf, 5,
	    					(extractPosition - 5));

	    			if (truncated) {
	    				profileQueryToLog += Messages.getString("MysqlIO.25"); //$NON-NLS-1$
	    			}
	    		}

	    		fetchBeginTime = queryEndTime;
	    	}
                //封装成ResultSet
	    	ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType,
	    			resultSetConcurrency, streamResults, catalog, resultPacket,
	    			false, -1L, cachedMetadata);

	    	if (queryWasSlow && !this.serverQueryWasSlow /* don't log slow queries twice */) {
	    		StringBuffer mesgBuf = new StringBuffer(48 +
	    				profileQueryToLog.length());

	    		mesgBuf.append(Messages.getString("MysqlIO.SlowQuery",
	    				new Object[] {Long.valueOf(this.slowQueryThreshold),
	    				queryTimingUnits,
	    				Long.valueOf(queryEndTime - queryStartTime)}));
	    		mesgBuf.append(profileQueryToLog);

	    		ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

	    		eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY,
	    				"", catalog, this.connection.getId(), //$NON-NLS-1$
	    				(callingStatement != null) ? callingStatement.getId() : 999,
	    						((ResultSetImpl)rs).resultId, System.currentTimeMillis(),
	    						(int) (queryEndTime - queryStartTime), queryTimingUnits, null,
	    						new Throwable(), mesgBuf.toString()));

	    		if (this.connection.getExplainSlowQueries()) {
	    			if (oldPacketPosition < MAX_QUERY_SIZE_TO_EXPLAIN) {
	    				explainSlowQuery(queryPacket.getBytes(5,
	    						(oldPacketPosition - 5)), profileQueryToLog);
	    			} else {
	    				this.connection.getLog().logWarn(Messages.getString(
	    						"MysqlIO.28") //$NON-NLS-1$
	    						+MAX_QUERY_SIZE_TO_EXPLAIN +
	    						Messages.getString("MysqlIO.29")); //$NON-NLS-1$
	    			}
	    		}
	    	}

	    	if (this.logSlowQueries) {

	    		ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

	    		if (this.queryBadIndexUsed && this.profileSql) {
	    			eventSink.consumeEvent(new ProfilerEvent(
	    					ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$
	    					this.connection.getId(),
	    					(callingStatement != null) ? callingStatement.getId()
	    							: 999, ((ResultSetImpl)rs).resultId,
	    							System.currentTimeMillis(),
	    							(queryEndTime - queryStartTime), this.queryTimingUnits,
	    							null,
	    							new Throwable(),
	    							Messages.getString("MysqlIO.33") //$NON-NLS-1$
	    							+profileQueryToLog));
	    		}

	    		if (this.queryNoIndexUsed && this.profileSql) {
	    			eventSink.consumeEvent(new ProfilerEvent(
	    					ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$
	    					this.connection.getId(),
	    					(callingStatement != null) ? callingStatement.getId()
	    							: 999, ((ResultSetImpl)rs).resultId,
	    							System.currentTimeMillis(),
	    							(queryEndTime - queryStartTime), this.queryTimingUnits,
	    							null,
	    							new Throwable(),
	    							Messages.getString("MysqlIO.35") //$NON-NLS-1$
	    							+profileQueryToLog));
	    		}
	    		
	    		if (this.serverQueryWasSlow && this.profileSql) {
	    			eventSink.consumeEvent(new ProfilerEvent(
	    					ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$
	    					this.connection.getId(),
	    					(callingStatement != null) ? callingStatement.getId()
	    							: 999, ((ResultSetImpl)rs).resultId,
	    							System.currentTimeMillis(),
	    							(queryEndTime - queryStartTime), this.queryTimingUnits,
	    							null,
	    							new Throwable(),
	    							Messages.getString("MysqlIO.ServerSlowQuery") //$NON-NLS-1$
	    							+profileQueryToLog));
	    		}
	    	}

	    	if (this.profileSql) {
	    		fetchEndTime = getCurrentTimeNanosOrMillis();

	    		ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

	    		eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_QUERY,
	    				"", catalog, this.connection.getId(), //$NON-NLS-1$
	    				(callingStatement != null) ? callingStatement.getId() : 999,
	    						((ResultSetImpl)rs).resultId, System.currentTimeMillis(),
	    						(queryEndTime - queryStartTime), this.queryTimingUnits,
	    						null,
	    						new Throwable(), profileQueryToLog));

	    		eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_FETCH,
	    				"", catalog, this.connection.getId(), //$NON-NLS-1$
	    				(callingStatement != null) ? callingStatement.getId() : 999,
	    						((ResultSetImpl)rs).resultId, System.currentTimeMillis(),
	    						(fetchEndTime - fetchBeginTime), this.queryTimingUnits,
	    						null,
	    						new Throwable(), null));
	    	}

	    	if (this.hadWarnings) {
	    		scanForAndThrowDataTruncation();
	    	}

	    	if (this.statementInterceptors != null) {
	    		ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPost(
	    				query, callingStatement, rs, false, null);

	    		if (interceptedResults != null) {
	    			rs = interceptedResults;
	    		}
	    	}

	    	return rs;
    	} catch (SQLException sqlEx) {
    		if (this.statementInterceptors != null) {
	    		invokeStatementInterceptorsPost(
	    				query, callingStatement, null, false, sqlEx); // we don't do anything with the result set in this case
    		}
    		
    		if (callingStatement != null) {
    			synchronized (callingStatement.cancelTimeoutMutex) {
	    			if (callingStatement.wasCancelled) {
						SQLException cause = null;
						
						if (callingStatement.wasCancelledByTimeout) {
							cause = new MySQLTimeoutException();
						} else {
							cause = new MySQLStatementCancelledException();
						}
						
						callingStatement.resetCancelledState();
						
						throw cause;
					}
    			}
    		}
    		
    		throw sqlEx;
    	} finally {
    		this.statementExecutionDepth--;
    	}
    }

 MysqlIO.readAllResults(..)处理生成ResultSet

 ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows,
        int resultSetType, int resultSetConcurrency, boolean streamResults,
        String catalog, Buffer resultPacket, boolean isBinaryEncoded,
        long preSentColumnCount, Field[] metadataFromCache)
        throws SQLException {
        //设置指针
        resultPacket.setPosition(resultPacket.getPosition() - 1);

        //读取第一条数据
        ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement,
                maxRows, resultSetType, resultSetConcurrency, streamResults,
                catalog, resultPacket, isBinaryEncoded, preSentColumnCount,
                metadataFromCache);

        ResultSetImpl currentResultSet = topLevelResultSet;

        boolean checkForMoreResults = ((this.clientParam &
            CLIENT_MULTI_RESULTS) != 0);

        boolean serverHasMoreResults = (this.serverStatus &
            SERVER_MORE_RESULTS_EXISTS) != 0;

        //
        // TODO: We need to support streaming of multiple result sets
        //
        if (serverHasMoreResults && streamResults) {
            //clearInputStream();
//
            //throw SQLError.createSQLException(Messages.getString("MysqlIO.23"), //$NON-NLS-1$
                //SQLError.SQL_STATE_DRIVER_NOT_CAPABLE);
        	if (topLevelResultSet.getUpdateCount() != -1) {
        		tackOnMoreStreamingResults(topLevelResultSet);
        	}
        	
        	reclaimLargeReusablePacket();
        	
        	return topLevelResultSet;
        }

        boolean moreRowSetsExist = checkForMoreResults & serverHasMoreResults;

        while (moreRowSetsExist) {
        	Buffer fieldPacket = checkErrorPacket();
            fieldPacket.setPosition(0);

            ResultSetImpl newResultSet = readResultsForQueryOrUpdate(callingStatement,
                    maxRows, resultSetType, resultSetConcurrency,
                    streamResults, catalog, fieldPacket, isBinaryEncoded,
                    preSentColumnCount, metadataFromCache);

            currentResultSet.setNextResultSet(newResultSet);

            currentResultSet = newResultSet;

            moreRowSetsExist = (this.serverStatus & SERVER_MORE_RESULTS_EXISTS) != 0;
        }

        if (!streamResults) {
            clearInputStream();
        }

        reclaimLargeReusablePacket();

        return topLevelResultSet;
    }

=============================
 protected final ResultSetImpl readResultsForQueryOrUpdate(
        StatementImpl callingStatement, int maxRows, int resultSetType,
        int resultSetConcurrency, boolean streamResults, String catalog,
        Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount,
        Field[] metadataFromCache) throws SQLException {
        long columnCount = resultPacket.readFieldLength();

        if (columnCount == 0) {
            return buildResultSetWithUpdates(callingStatement, resultPacket);
        } else if (columnCount == Buffer.NULL_LENGTH) {
            String charEncoding = null;

            if (this.connection.getUseUnicode()) {
                charEncoding = this.connection.getEncoding();
            }

            String fileName = null;

            if (this.platformDbCharsetMatches) {
                fileName = ((charEncoding != null)
                    ? resultPacket.readString(charEncoding, getExceptionInterceptor())
                    : resultPacket.readString());
            } else {
                fileName = resultPacket.readString();
            }

            return sendFileToServer(callingStatement, fileName);
        } else {
            //获取结果集
            com.mysql.jdbc.ResultSetImpl results = getResultSet(callingStatement,
                    columnCount, maxRows, resultSetType, resultSetConcurrency,
                    streamResults, catalog, isBinaryEncoded,
                    metadataFromCache);

            return results;
        }
    }

=========================
    //对数据进行解析封装生成结果集
    protected ResultSetImpl getResultSet(StatementImpl callingStatement,
        long columnCount, int maxRows, int resultSetType,
        int resultSetConcurrency, boolean streamResults, String catalog,
        boolean isBinaryEncoded, Field[] metadataFromCache)
        throws SQLException {
        Buffer packet; // The packet from the server
        //字段数组
        Field[] fields = null;

        // Read in the column information

        if (metadataFromCache == null /* we want the metadata from the server */) {
            fields = new Field[(int) columnCount];

            for (int i = 0; i < columnCount; i++) {
            	Buffer fieldPacket = null;

                //循环处理
                fieldPacket = readPacket();
                fields[i] = unpackField(fieldPacket, false);
            }
        } else {
        	for (int i = 0; i < columnCount; i++) {
        		skipPacket();
        	}
        }
         //
        packet = reuseAndReadPacket(this.reusablePacket);
        
        readServerStatusForResultSets(packet);

		//
		// Handle cursor-based fetch first
		//

		if (this.connection.versionMeetsMinimum(5, 0, 2)
				&& this.connection.getUseCursorFetch()
				&& isBinaryEncoded
				&& callingStatement != null
				&& callingStatement.getFetchSize() != 0
				&& callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {
			ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;

			boolean usingCursor = true;

			//
			// Server versions 5.0.5 or newer will only open
			// a cursor and set this flag if they can, otherwise
			// they punt and go back to mysql_store_results() behavior
			//

			if (this.connection.versionMeetsMinimum(5, 0, 5)) {
				usingCursor = (this.serverStatus &
						SERVER_STATUS_CURSOR_EXISTS) != 0;
			}

			if (usingCursor) {
				RowData rows = new RowDataCursor(
					this,
					prepStmt,
					fields);

				ResultSetImpl rs = buildResultSetWithRows(
					callingStatement,
					catalog,
					fields,
					rows, resultSetType, resultSetConcurrency, isBinaryEncoded);

				if (usingCursor) {
		        	rs.setFetchSize(callingStatement.getFetchSize());
		        }

				return rs;
			}
		}

        RowData rowData = null;

        if (!streamResults) {
            //封装成RowData的数据
            rowData = readSingleRowSet(columnCount, maxRows,
                    resultSetConcurrency, isBinaryEncoded,
                    (metadataFromCache == null) ? fields : metadataFromCache);
        } else {
            rowData = new RowDataDynamic(this, (int) columnCount,
            		(metadataFromCache == null) ? fields : metadataFromCache,
                    isBinaryEncoded);
            this.streamingData = rowData;
        }

        //创建ResultSetImpl对象
        ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog,
        		(metadataFromCache == null) ? fields : metadataFromCache,
            rowData, resultSetType, resultSetConcurrency, isBinaryEncoded);



        return rs;
    }
==================================
   //对rowData的封装
   private RowData readSingleRowSet(long columnCount, int maxRows,
        int resultSetConcurrency, boolean isBinaryEncoded, Field[] fields)
        throws SQLException {
        RowData rowData;
        ArrayList rows = new ArrayList();

        boolean useBufferRowExplicit = useBufferRowExplicit(fields);

        // Now read the data
        //读取数据
        ResultSetRow row = nextRow(fields, (int) columnCount, isBinaryEncoded,
                resultSetConcurrency, false, useBufferRowExplicit, false, null);

        int rowCount = 0;

        if (row != null) {
            rows.add(row);
            rowCount = 1;
        }

        while (row != null) {
                //读取全部数据,封装至list
        	row = nextRow(fields, (int) columnCount, isBinaryEncoded,
                    resultSetConcurrency, false, useBufferRowExplicit, false, null);

            if (row != null) {
            	if ((maxRows == -1) || (rowCount < maxRows)) {
            		rows.add(row);
            		rowCount++;
            	}
            }
        }

        rowData = new RowDataStatic(rows);

        return rowData;
    }

=============================
   //创建ResultSetImpl对象
    private com.mysql.jdbc.ResultSetImpl buildResultSetWithRows(
        StatementImpl callingStatement, String catalog,
        com.mysql.jdbc.Field[] fields, RowData rows, int resultSetType,
        int resultSetConcurrency, boolean isBinaryEncoded)
        throws SQLException {
        ResultSetImpl rs = null;
 
        //根据所传的resultSet常量参数生成相应模式的ResultSetImpl
        switch (resultSetConcurrency) {
        case java.sql.ResultSet.CONCUR_READ_ONLY:
            rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows,
                    this.connection, callingStatement, false);

            if (isBinaryEncoded) {
                rs.setBinaryEncoded();
            }

            break;

        case java.sql.ResultSet.CONCUR_UPDATABLE:
            rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows,
                    this.connection, callingStatement, true);

            break;

        default:
            return com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows,
                this.connection, callingStatement, false);
        }

        rs.setResultSetType(resultSetType);
        rs.setResultSetConcurrency(resultSetConcurrency);

        return rs;
    }
 

 

 

 

 

 

 

 

 

 

分享到:
评论
1 楼 c929833623 2013-09-22  

相关推荐

    JDBC(powernode 文档)源代码

    3.3 创建lib目录并引入MYSQL驱动包 3.4 把lib包引入项目环境 4.使用JDBC完成添加操作 4.1 步骤 4.2 代码 4.3 URL详解 4.3.1 为什么要定义URL 4.4查看数据库 4.4.1 一个URL由哪些部分组成 5.使用JDBC完成...

    JDBC.txt是链接mysql数据库的源码,复制粘贴即可,需要mysql.jar包

    String url="jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8&serverTimezone=UTC"; Connection conn= DriverManager.getConnection(url,"root","123456"); //创建语句对象 String sql="select *from ...

    mysql-connector-java-5.1.48.zip

    ResultSet rs=statement.executeQuery(sql); while (rs.next()){ sbmysql.append(rs.getInt("id")+"\t"+rs.getString("username")+"\t"+rs.getString("password")+"\n"); } textdata.setText(sbmysql....

    JDBC 连接MySQL数据库的一系列详细步骤

    JDBC 连接MySQL数据库的一系列详细步骤,包括JDBC API详解 1.DriverManager 2.Connection 3.Statement 4.ResultSet 5.PrepareStatement

    struts JDBC 实现简单的增删改查、登录注册

    String url = "jdbc:mysql://localhost:3306/" + database; String userName = "root"; String password = "123"; Connection con = null; Statement st = null; ResultSet rs = null; String quersql...

    JDBC访问数据库的步骤

    4. 从包中找到要用的驱动,展开包,从中找到Driver.class,编程时,先把这个类的驱动new一个实例对象出来,告诉DriverManage,要连到哪种数据库上: 方法一:Class.forName(“com.mysql.jdbc.Driver”); Class: ...

    MySQL 5.1参考手册

    2.8.6. 在Windows下从源码安装MySQL 2.8.7. 在Windows下编译MySQL客户端 2.9. 安装后的设置和测试 2.9.1. Windows下安装后的过程 2.9.2. Unix下安装后的过程 2.9.3. 使初始MySQL账户安全 2.10. 升级MySQL 2.10.1. 从...

    jdbc连接代码

    String connectionString = "jdbc:mysql://localhost:3306/blog?user=root&password=root"; // 定义驱动数据库的类 String driver = "org.gjt.mm.mysql.Driver"; // 定义连接数据库对象conn Connection ...

    MySQL 5.1参考手册中文版

    2.8.6. 在Windows下从源码安装MySQL 2.8.7. 在Windows下编译MySQL客户端 2.9. 安装后的设置和测试 2.9.1. Windows下安装后的过程 2.9.2. Unix下安装后的过程 2.9.3. 使初始MySQL账户安全 2.10. 升级MySQL ...

    jdbc-demo:二进制详解系列(二)------ jdbc-mysql的使用和分析-mysql

    用于注册驱动,是获取Connection对象的入口Driver数据库驱动,用于获取Connection对象Connection数据库连接,用于获取Statement对象,管理事务Statement sql执行器,用于执行sql ResultSet结果集,用于封装和操作...

    Java JDBC.pptx

    掌握JDBC操作数据库的步骤 熟悉JDBC的常用API 了解什么是JDBC DriverManager类、Connection接口、Statement接口 连接mysql数据库方法 PreparedStatement接口 ResultSet接口

    mysql-connector-java-5.1.32.zip

    * mysql:使用mysql的jdbc服务 * localhost:使用本地的mysql的jdbc服务 * 3306:使用本地的3306端口的mysql的jdbc服务 * ------- * jt_db 使用的数据库 * characterEncoding=utf-8 设置编码...

    MYSQL中文手册

    2.8.6. 在Windows下从源码安装MySQL 2.8.7. 在Windows下编译MySQL客户端 2.9. 安装后的设置和测试 2.9.1. Windows下安装后的过程 2.9.2. Unix下安装后的过程 2.9.3. 使初始MySQL账户安全 2.10. 升级MySQL ...

    mysql-connector-java-commercial-5.1.30-bin.jar

    Statement stmt=conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE,ResultSet.CONCUR_UPDATABLE); String sql="select * from first"; ResultSet rs=stmt.executeQuery(sql); while(rs.next()) {%&gt; 您的第...

    mysql-connector-java-5.1.46-bin

    3.创建statement对象 一个statement对象只能打开一个resultset,如果有两个  同样结果的结果集交叉访问,这两个结果集必定为  两个不同的statement对象所创建,如果在打开一个  新的结果集的时候存在一个已经打开的...

    java连接mysql实例

    }// Mysql 的驱动 Connection conn = DriverManager.getConnection("jdbc:mysql://localhost/TEST1","root", "123"); String sql="select * from stu "; PreparedStatement sss=conn.prepareStatement(sql); ...

    mysql官方中文参考手册

    2.8.6. 在Windows下从源码安装MySQL 2.8.7. 在Windows下编译MySQL客户端 2.9. 安装后的设置和测试 2.9.1. Windows下安装后的过程 2.9.2. Unix下安装后的过程 2.9.3. 使初始MySQL账户安全 2.10. 升级MySQL 2.10.1. 从...

    mysql 的安装及操作的课件

    MySQL installation and configuration, operation MySQL database, the concept and commonly used interface JDBC, use Statement and PreparedStatement, use the ResultSet access database, Swing operated in ...

    JDBC(powernode CD2206)详尽版 (教学视频、源代码、SQL文件)

    四、编写第一个JDBC程序 五、注册案例 六、登录案例 6.1 Statement 七、SQL注入 7.1 SQL注入 7.2 出现SQL注入的原因 7.3 解决方案 7.4 PreparedStatement接口 7.5 PreparedStatement如何解决SQL注入 7.6 使用...

Global site tag (gtag.js) - Google Analytics