// Simulate thread creation for test case before we check thread cache DBUG_EXECUTE_IF("fail_thread_create", error = 1; goto handle_error;);
if (!check_idle_thread_and_enqueue_connection(channel_info)) DBUG_RETURN(false);
/* There are no idle threads avaliable to take up the new connection. Create a new thread to handle the connection */ channel_info->set_prior_thr_create_utime(); error = mysql_thread_create(key_thread_one_connection, &id, &connection_attrib, handle_connection, (void *)channel_info); #ifndef DBUG_OFF handle_error: #endif// !DBUG_OFF
for (;;) { THD *thd = init_new_thd(channel_info); if (thd == NULL) { ... //错误处理,略 }
#ifdef HAVE_PSI_THREAD_INTERFACE if (pthread_reused) { ... //错误处理,略 }
#ifdef HAVE_PSI_THREAD_INTERFACE /* Find the instrumented thread */ PSI_thread *psi = PSI_THREAD_CALL(get_thread)(); /* Save it within THD, so it can be inspected */ thd->set_psi(psi); #endif/* HAVE_PSI_THREAD_INTERFACE */ mysql_thread_set_psi_id(thd->thread_id()); mysql_thread_set_psi_THD(thd); mysql_socket_set_thread_owner( thd->get_protocol_classic()->get_vio()->mysql_socket);
thd_manager->add_thd(thd);
if (thd_prepare_connection(thd)) handler_manager->inc_aborted_connects(); else { while (thd_connection_alive(thd)) { if (do_command(thd)) break; } end_connection(thd); } close_connection(thd, 0, false, false);
// Clean up errors now, before possibly waiting for a new connection. #ifndef HAVE_WOLFSSL #if OPENSSL_VERSION_NUMBER < 0x10100000L ERR_remove_thread_state(0); #endif/* OPENSSL_VERSION_NUMBER < 0x10100000L */
#ifdef HAVE_PSI_THREAD_INTERFACE /* Delete the instrumentation for the job that just completed. */ thd->set_psi(NULL); PSI_THREAD_CALL(delete_current_thread)(); #endif/* HAVE_PSI_THREAD_INTERFACE */
delete thd;
// Server is shutting down so end the pthread. if (connection_events_loop_aborted()) break;
channel_info = Per_thread_connection_handler::block_until_new_connection(); if (channel_info == NULL) break; pthread_reused = true; if (connection_events_loop_aborted()) { ... //错误处理,略 } }
booldo_command(THD *thd){ bool return_value; int rc; NET *net = NULL; enum enum_server_command command; COM_DATA com_data; DBUG_ENTER("do_command"); DBUG_ASSERT(thd->is_classic_protocol());
/* indicator of uninitialized lex => normal flow of errors handling (see my_message_sql) */ thd->lex->set_current_select(0);
/* XXX: this code is here only to clear possible errors of init_connect. Consider moving to prepare_new_connection_state() instead. That requires making sure the DA is cleared before non-parsing statements such as COM_QUIT. */ thd->clear_error(); // Clear error message thd->get_stmt_da()->reset_diagnostics_area();
/* This thread will do a blocking read from the client which will be interrupted when the next command is received from the client, the connection is closed or "net_wait_timeout" number of seconds has passed. */ net = thd->get_protocol_classic()->get_net(); my_net_set_read_timeout(net, thd->variables.net_wait_timeout); net_new_transaction(net);
/* Synchronization point for testing of KILL_CONNECTION. This sync point can wait here, to simulate slow code execution between the last test of thd->killed and blocking in read(). The goal of this test is to verify that a connection does not hang, if it is killed at this point of execution. (Bug#37780 - main.kill fails randomly) Note that the sync point wait itself will be terminated by a kill. In this case it consumes a condition broadcast, but does not change anything else. The consumed broadcast should not matter here, because the read/recv() below doesn't use it. */ DEBUG_SYNC(thd, "before_do_command_net_read");
/* Because of networking layer callbacks in place, this call will maintain the following instrumentation: - IDLE events - SOCKET events - STATEMENT events - STAGE events when reading a new network packet. In particular, a new instrumented statement is started. See init_net_server_extension() */ thd->m_server_idle = true; rc = thd->get_protocol()->get_command(&com_data, &command); thd->m_server_idle = false;
out: /* The statement instrumentation must be closed in all cases. */ DBUG_ASSERT(thd->m_digest == NULL); DBUG_ASSERT(thd->m_statement_psi == NULL); DBUG_RETURN(return_value); }
/** Perform one connection-level (COM_XXXX) command. @param thd connection handle @param command type of command to perform @param com_data com_data union to store the generated command @todo set thd->lex->sql_command to SQLCOM_END here. @todo The following has to be changed to an 8 byte integer @retval 0 ok @retval 1 request of thread shutdown, i. e. if command is COM_QUIT */ booldispatch_command(THD *thd, const COM_DATA *com_data, enum enum_server_command command){ ... //太长不看 switch (command) { case ... //太长不看 case COM_QUERY: { ... //太长不看 mysql_parse(thd, &parser_state);
Channel_info *Per_thread_connection_handler::block_until_new_connection() { Channel_info *new_conn = NULL; mysql_mutex_lock(&LOCK_thread_cache); if (blocked_pthread_count < max_blocked_pthreads && !shrink_cache) { /* Don't kill the pthread, just block it for reuse */ DBUG_PRINT("info", ("Blocking pthread for reuse"));
/* mysys_var is bound to the physical thread, so make sure mysys_var->dbug is reset to a clean state before picking another session in the thread cache. */ DBUG_POP(); DBUG_ASSERT(!_db_is_pushed_());