Lesson Learned #234: Parallel vs Single Running a Bulk Insert with Python
Published Aug 18 2022 12:16 PM 1,413 Views

Today, We've been working on a service request that our customer wants to improve the performance of a bulk insert process. Following, I would like to share my experience working on that.

 

Our customer mentioned that inserting data (100.000 rows) is taking 14 seconds in a database in Business Critical. I was able to reproduce this time using a single thread using a table with 20 columns.

 

In order to improve this Python code, I suggested to run in parallel this bulk insert every batch size of 10.000 rows and also, I followed the best practices reducing the execution time of this process:

 

    • Client Virtual Machine level:
      • Accelerated networking enabled.
      • Depending how many parallel process that I needed create a CPU/Vcore, in this case, 10 vCores.
      • Placed the virtual machine in the same region that the DB is.
    • Database level:
      • Create a table with 20 columns.
      • As the PK is a sequential key I included in the clustered index definition the parameter  OPTIMIZE_FOR_SEQUENTIAL_KEY = ON
      • Configure the same number of CPU/vCores with the maximum number of parallel process that I would like to have. In this case, 10 vCores.
      • Depeding on amount of data use Business Critical to reduce the storage latency.
    • Python code level:
      • Using executemany method in order to reduce the network roundtrips, sending only the value of the parameters.
      • Running in batches (1000,10000) instead a single process.
      • Use SET NOCOUNT ON to reduce the replied response/rowset about how many rows were inserted.
      • In the connectionstring use autocommit=False

Example of python code that you could find here. This Python reads a CSV file and for every 10000 rows execute a bulk insert using thread pool. 

 

 

 

 

import csv
import pyodbc
import threading
import os
import datetime 

class ThreadsOrder: #Class to run in parallel the process.
    def ExecuteSQL(self,a,s,n):
        TExecutor = threading.Thread(target=ExecuteSQL,args=(a,s,n,))
        TExecutor.start()

def SaveResults( Message, bSaveFile): #Save the details of the file.
    try:
        print(Message)

        if (bSaveFile==True):
            file_object  = open(filelog, "a") 
            file_object.write(datetime.datetime.strftime(datetime.datetime.now(), '%d/%m/%y %H:%M:%S') + '-' + Message + '\n' )
            file_object.close()
    except BaseException as e:
        print('And error occurred - ' , format(e))


def ExecuteSQLcc(sTableName):
    try:
        cnxn1 = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" +SQL_user+';PWD='+ SQL_password, autocommit=False, Timeout=3600)
        cursor = cnxn1.cursor()
        cursor.execute("DROP TABLE IF EXISTS" + sTableName )
        cursor.commit()
        cursor.execute("CREATE TABLE " + sTableName + " (" \
                            "	[Key] [int] NOT NULL," \
                            "	[Num_TEST] [int] NULL," \
                            "	[TEST_01] [varchar](6) NULL," \
                            "	[TEST_02] [varchar](6) NULL," \
                            "	[TEST_03] [varchar](6) NULL," \
                            "	[TEST_04] [varchar](6) NULL," \
                            "	[TEST_05] [varchar](6) NULL," \
                            "	[TEST_06] [varchar](6) NULL," \
                            "	[TEST_07] [varchar](6) NULL," \
                            "	[TEST_08] [varchar](6) NULL," \
                            "	[TEST_09] [varchar](6) NULL," \
                            "	[TEST_10] [varchar](6) NULL," \
                            "	[TEST_11] [varchar](6) NULL," \
                            "	[TEST_12] [varchar](6) NULL," \
                            "	[TEST_13] [varchar](6) NULL," \
                            "	[TEST_14] [varchar](6) NULL," \
                            "	[TEST_15] [varchar](6) NULL," \
                            "	[TEST_16] [varchar](6) NULL," \
                            "	[TEST_17] [varchar](6) NULL," \
                            "	[TEST_18] [varchar](6) NULL," \
                            "	[TEST_19] [varchar](6) NULL," \
                            "	[TEST_20] [varchar](6) NULL)")
        cursor.commit()
        cursor.execute("CREATE CLUSTERED INDEX [ix_ms_example] ON " + sTableName + " ([Key] ASC) WITH (STATISTICS_NORECOMPUTE = OFF, DROP_EXISTING = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = ON) ON [PRIMARY]")
        cursor.commit()
    except BaseException as e:
            SaveResults('Executing SQL - an error occurred - ' + format(e),True)


def ExecuteSQL(a,sTableName,n):
    try:
        Before = datetime.datetime.now()   
        
        if n==-1:
            sTypeProcess = "NoAsync"
        else:
            sTypeProcess="Async - Thread:" + str(n) 
        SaveResults('Executing at ' + str(Before) + " Process Type: " + sTypeProcess, True )      
        cnxn1 = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" +SQL_user+';PWD='+ SQL_password, autocommit=False, Timeout=3600)

        cursor = cnxn1.cursor()
        cursor.fast_executemany = True
        cursor.executemany("SET NOCOUNT ON;INSERT INTO " + sTableName +" ([Key], Num_TEST, TEST_01, TEST_02, TEST_03, TEST_04, TEST_05, TEST_06, TEST_07, TEST_08, TEST_09, TEST_10, TEST_11, TEST_12, TEST_13, TEST_14, TEST_15, TEST_16, TEST_17, TEST_18, TEST_19, TEST_20) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",a)                          
        cursor.commit()
        SaveResults('Time Difference INSERT process ' + str(datetime.datetime.now() - Before) + " " + sTypeProcess, True )  

    except BaseException as e:
            SaveResults('Executing SQL - an error occurred - ' + format(e),True)


#Connectivity details.
SQL_server = 'tcp:servername.database.windows.net,1433'
SQL_database = 'databasename'
SQL_user = 'username'
SQL_password = 'password'


#file details to read 
filepath = 'c:\\k\\' ##To Read the demo file
filelog = filepath + '\\Error.log' #Save the log.

chunksize = 10000 #Transaction batch rows.
sTableName = "[test_data]" #Table Name (dummy)

pThreadOrder = ThreadsOrder()
nThread = 0 #Number of Threads -- Right now, we provided an unlimited threads.

ExecuteSQLcc(sTableName)

Before = datetime.datetime.now()  
line_count = 0         
for directory, subdirectories, files in os.walk(filepath):
    for file in files:
      name, ext = os.path.splitext(file)
      if ext == '.csv': 
            a=[]
            SaveResults('Reading the file ' + name ,True)
            BeforeFile= datetime.datetime.now()             
            with open(os.path.join(directory,file), mode='r') as csv_file:
              csv_reader = csv.reader(csv_file, delimiter=',')
              for row in csv_reader:
                    line_count+= 1  
                    if line_count>1:    
                        a.append(row)

                    if (line_count%chunksize)==0:
                        deltaFile = datetime.datetime.now() - BeforeFile
                        nThread=nThread+1                        
                        SaveResults('Time Difference Reading file is ' + str(deltaFile) + ' for ' + str(line_count) + ' rows', True )                        
                        pThreadOrder.ExecuteSQL(a,sTableName,nThread) #Open a new theard per transaction batch size.
                        #ExecuteSQL(a,sTableName,-1)   
                        a=[]
            BeforeFile= datetime.datetime.now() 

SaveResults('Total Time Difference Reading file is ' + str(datetime.datetime.now() - Before) + ' for ' + str(line_count) + ' rows for the file: ' + name , True )  

 

 

 

 

 

During the execution if you need to know the connections, number of rows and the impact in terms of resources see the following TSQL

 

 

 

 

 

SELECT
 substring(REPLACE(REPLACE(SUBSTRING(ST.text, (req.statement_start_offset/2) + 1, (
(CASE statement_end_offset WHEN -1 THEN DATALENGTH(ST.text) ELSE req.statement_end_offset END
- req.statement_start_offset)/2) + 1) , CHAR(10), ' '), CHAR(13), ' '), 1, 512) AS statement_text
,req.database_id
,program_name
,req.session_id
, req.cpu_time 'cpu_time_ms'
, req.status
, wait_time
, wait_resource
, wait_type
, last_wait_type
, req.total_elapsed_time
, total_scheduled_time
, req.row_count as [Row Count]
, command
, scheduler_id
, memory_usage
, req.writes
, req.reads
, req.logical_reads, blocking_session_id
FROM sys.dm_exec_requests AS req
inner join sys.dm_exec_sessions as sess on sess.session_id = req.session_id
CROSS APPLY sys.dm_exec_sql_text(req.sql_handle) as ST
where req.session_id <> @@SPID

select count(*) from test_data

select * from sys.dm_db_resource_stats order by end_time desc

 

 

 

 

 

Enjoy!

 

Version history
Last update:
‎Aug 19 2022 05:01 AM
Updated by: