Sunday, April 03, 2016

SQL Server Bulk Copy using Delphi

When inserting a lot of data into a SQL Server table, the memory-based bulk copy exposed by the IRowsetFastLoad interface in Microsoft's OLE DB providers is the fastest way.

Here are the steps listed in the example above (with some comments added by me):
  1. Establish a connection to the data source.
  2. In Delphi, you can simply set up a TADOConnection component and set its Connected property to True:
        Connection.Provider := 'SQLOLEDB'; // or 'SQLNCLI11'
        Connection.ConnectionString := 'Integrated Security=SSPI;Data Source=localhost;';
        Connection.LoginPrompt := False;
        Connection.KeepConnection := True;
        Connection.Connected := True;
    
  3. Set the SQLOLEDB provider-specific data source property SSPROP_ENABLEFASTLOAD to VARIANT_TRUE. This allows the newly created session to access the IRowsetFastLoad interface.
  4. In fact, from my experiments, this step seems to be unnecessary, as the bulk copy works without setting this property, too. (Or perhaps it's set already by default.)
  5. Create a session requesting the IOpenRowset interface, and
  6. Call IOpenRowset.OpenRowset to open a rowset that includes all the rows from the target table:
  7. function OpenFastLoad(Connection: TADOConnection;
      const TableName: WideString): IRowsetFastLoad; overload;
    var
      ConnectionConstruction: ADOConnectionConstruction;
    begin
      SetProperty(Connection, DBPROPSET_SQLSERVERDATASOURCE, SSPROP_ENABLEFASTLOAD, True);
      ConnectionConstruction := Connection.ConnectionObject as ADOConnectionConstruction;
      Result := OpenFastLoad(ConnectionConstruction.Get_DSO as IDBCreateSession, TableName);
    end;
    
    function OpenFastLoad(const DBCreateSession: IDBCreateSession;
      const TableName: WideString): IRowsetFastLoad; overload;
    var
      OpenRowSet: IOpenRowset;
      TableID: TDBID;
    begin
      OleDbCheck(DBCreateSession.CreateSession(nil, IID_IOpenRowset, IUnknown(OpenRowSet)),
        DBCreateSession, IID_IDBCreateSession);
      TableID.eKind := DBKIND_NAME;
      TableID.uName.pwszName := PWideChar(TableName);
      OleDbCheck(OpenRowSet.OpenRowset(nil, @TableID, nil, IID_IRowsetFastLoad, 0, nil,
        @Result), OpenRowSet, IID_IOpenRowset);
    end;
    
  8. Do the necessary bindings and create an accessor using IAccessor.CreateAccessor:
  9. The bindings are an array of TDBBinding records (declared in Winapi.OleDb) describing each inserted column and their offsets in the buffer:
    procedure InitializeBinding(Field: TField; var Binding: TDBBinding; var Offset: Integer);
    begin
      Binding.iOrdinal := Field.FieldNo; // column ordinal position
      Binding.wType := FieldTypeToOleDbType(Field.DataType); // column data type
      if Field.IsBlob then
        Binding.wType := Binding.wType or DBTYPE_BYREF; // pointer to external blob data
      Binding.eParamIO := DBPARAMIO_NOTPARAM;
      Binding.dwMemOwner := DBMEMOWNER_CLIENTOWNED; // we are releasing the memory
      Binding.obLength := Offset; // length field offset (starts with 0 for the first column)
      Binding.obStatus := Binding.obLength + SizeOf(DBLENGTH); // status field offset
      Binding.obValue := Binding.obStatus + SizeOf(DBSTATUS); // value offset
      Binding.dwPart := DBPART_LENGTH or DBPART_STATUS or DBPART_VALUE; // included parts
      case Field.DataType of
        ftDate:
          Binding.cbMaxLen := SizeOf(TDBDate); // OLE DB date
        ftTime:
          Binding.cbMaxLen := SizeOf(TDBTime); // OLE DB time
        ftDateTime, ftTimeStamp:
          Binding.cbMaxLen := SizeOf(TDBTimeStamp); // OLE DB timestamp
        else
          Binding.cbMaxLen := Field.DataSize;
      end;
    
      Inc(Offset, SizeOf(TColumnData) + Binding.cbMaxLen - 1); // next column's offset...
      Align(Offset); // ...aligned to 8 bytes
    end;
    
      ...
      OleDbCheck(FastLoad.QueryInterface(IID_IAccessor, Accessor), FastLoad, IID_IRowsetFastLoad);
      OleDbCheck(Accessor.CreateAccessor(DBACCESSOR_ROWDATA, Dataset.FieldCount, Bindings, BufferSize,
        AccessorHandle, StatusCodes), Accessor, IID_IAccessor);
    
    
  10. Set up the memory buffer from which the data will be copied to the table.
  11. The record buffer is a sequence of TColumnData records (of variable size):
    type
      DBLENGTH = ULONGLONG;
      PColumnData = ^TColumnData;
      TColumnData = record
        Length: DBLENGTH; // data length
        Status: DBSTATUS; // null or has a value
        Data: array[0..0] of Byte; // value data
      end;
    
    For each column, fill in the length, status and data fields within the buffer (code simplified):
    procedure GetFieldValue(Field: TField; const Binding: TDBBinding; Buffer: Pointer);
    var
      Column: PColumnData;
    begin
      Column := Pointer(NativeUInt(Buffer) + Binding.obLength);
      if Field.IsNull then
      begin
        Column^.Status := DBSTATUS_S_ISNULL;
        Column^.Length := 0;
      end
      else
      begin
        Column^.Status := DBSTATUS_S_OK;
        case Field.DataType of
          ftDate:
            with PDBDate(@Column^.Data[0])^ do
              DecodeDate(Field.AsDateTime, Word(year), month, day);
          ftTime:
            with PDBTime(@Column^.Data[0])^ do
              DecodeTime(Field.AsDateTime, hour, minute, second, MSec);
          ftDateTime, ftTimeStamp:
            with PDBTimeStamp(@Column^.Data[0])^ do
            begin
              DecodeDate(Field.AsDateTime, Word(year), month, day);
              DecodeTime(Field.AsDateTime, hour, minute, second, MSec);
              fraction := MSec * 1000000;
            end;
          else
            Field.GetData(@Column^.Data[0], False);
        end;
        case Field.DataType of
          ftString, ftMemo:
            Column^.Length := StrLen(PAnsiChar(@Column^.Data[0]));
          ftWideString, ftWideMemo:
            Column^.Length := StrLen(PWideChar(@Column^.Data[0])) * SizeOf(WideChar);
          else
            Column^.Length := Field.DataSize;
        end;
      end;
    end;
    
  12. Call IRowsetFastLoad.InsertRow to bulk copy the data in to the table.
  13. (Repeat the previous and this step for each row you want to insert.)
  14. Call IRowsetFastLoad.Commit to commit all changes.
Since you explicitly control in your code how the buffers are allocated and populated, this approach offers some ways to optimize for performance especially when inserting large amounts of data. It's also possible to set up bulk insert sessions inserting into the same table in parallel from multiple threads.
As a starting point, you can find the example source code here.

Sunday, March 27, 2016

The strange limitation of 64 threads

When using the Windows I/O Completion Port (IOCP), people seem to limit their thread pools to a maximum of 64 threads.

This is probably caused by the fact that WaitForMultipleObjects limits the number of input handles with the nice magic constant MAXIMUM_WAIT_OBJECTS (which happens to be 64).

Here are a few examples:

The (anti-)pattern is related to the process of shutting down the thread pool: to do this cleanly, the threads in the pool should be allowed to finish what they're doing (or just wake up if they're idle at the moment), perform any cleaning up as necessary and terminate correctly. The shutdown is usually performed in two steps:
  1. Send a shutdown signal (completion key) to each thread.
  2. Each thread in the pool calls GetQueuedCompletionStatus in a loop and checks for the special (application-defined) shutdown completion key to which it responds by breaking out of the loop and terminating. The shutdown procedure can therefore simply send the shutdown completion key to the IOCP as many times as there are threads, relying on the fact that exactly one thread will respond to exactly one such signal.
  3. Wait for all threads to terminate.
  4. The shutdown is not complete before all threads actually had a chance to receive the signal and terminate. Only then it's safe to continue closing the IOCP, freeing memory, etc. So we absolutely have to wait for the threads to terminate. The reasoning here seems to be: Since WaitForMultipleObjects can only handle up to 64 threads, we can't allow more threads to be associated with the pool in the first place, can we?

Well, there's no need to use WaitForMultipleObjects in Step 2. It's fairly easy to keep a counter of active threads in the pool (interlocked-incremented when a thread starts, interlocked-decremented when a thread is finished). When the counter reaches zero (no more active threads), signal an event. With only one event to wait for, you can use WaitForSingleObject in Step 2.

Wednesday, August 26, 2015

Starting FPC compiler changed to 2.6.4

The starting FPC compiler (used to compile the latest trunk FPC) has been 2.6.2 for some time now. This has changed recently; to compile the latest trunk FPC you need FPC 2.6.4 which has not yet been pushed to official repositories. At the moment the easiest work-around is to upgrade to FPC 2.6.4 using Petr Hlo┼żek's PPA:

~ $ sudo add-apt-repository ppa:ok2cqr/lazarus
~ $ sudo apt-get update
~ $ sudo apt-get upgrade
~ $ sudo apt-get dist-upgrade


After the upgrade, your /etc/fpc.cfg is once more a symlink pointing to /etc/alternatives/fpc.cfg which itself is a symlink pointing to /etc/fpc-2.6.4.cfg. If you're following my previous notes you'll need to redirect /etc/fpc.cfg to your ~/Development/fpc.cfg again:

~/Development $ sudo rm /etc/fpc.cfg
~/Development $ sudo ln -s ~/Development/fpc.cfg /etc/fpc.cfg