Sunday, April 03, 2016

SQL Server Bulk Copy using Delphi

When inserting a lot of data into a SQL Server table, the memory-based bulk copy exposed by the IRowsetFastLoad interface in Microsoft's OLE DB providers is the fastest way.

Here are the steps listed in the example above (with some comments added by me):
  1. Establish a connection to the data source.
  2. In Delphi, you can simply set up a TADOConnection component and set its Connected property to True:
        Connection.Provider := 'SQLOLEDB'; // or 'SQLNCLI11'
        Connection.ConnectionString := 'Integrated Security=SSPI;Data Source=localhost;';
        Connection.LoginPrompt := False;
        Connection.KeepConnection := True;
        Connection.Connected := True;
    
  3. Set the SQLOLEDB provider-specific data source property SSPROP_ENABLEFASTLOAD to VARIANT_TRUE. This allows the newly created session to access the IRowsetFastLoad interface.
  4. In fact, from my experiments, this step seems to be unnecessary, as the bulk copy works without setting this property, too. (Or perhaps it's set already by default.)
  5. Create a session requesting the IOpenRowset interface, and
  6. Call IOpenRowset.OpenRowset to open a rowset that includes all the rows from the target table:
  7. function OpenFastLoad(Connection: TADOConnection;
      const TableName: WideString): IRowsetFastLoad; overload;
    var
      ConnectionConstruction: ADOConnectionConstruction;
    begin
      SetProperty(Connection, DBPROPSET_SQLSERVERDATASOURCE, SSPROP_ENABLEFASTLOAD, True);
      ConnectionConstruction := Connection.ConnectionObject as ADOConnectionConstruction;
      Result := OpenFastLoad(ConnectionConstruction.Get_DSO as IDBCreateSession, TableName);
    end;
    
    function OpenFastLoad(const DBCreateSession: IDBCreateSession;
      const TableName: WideString): IRowsetFastLoad; overload;
    var
      OpenRowSet: IOpenRowset;
      TableID: TDBID;
    begin
      OleDbCheck(DBCreateSession.CreateSession(nil, IID_IOpenRowset, IUnknown(OpenRowSet)),
        DBCreateSession, IID_IDBCreateSession);
      TableID.eKind := DBKIND_NAME;
      TableID.uName.pwszName := PWideChar(TableName);
      OleDbCheck(OpenRowSet.OpenRowset(nil, @TableID, nil, IID_IRowsetFastLoad, 0, nil,
        @Result), OpenRowSet, IID_IOpenRowset);
    end;
    
  8. Do the necessary bindings and create an accessor using IAccessor.CreateAccessor:
  9. The bindings are an array of TDBBinding records (declared in Winapi.OleDb) describing each inserted column and their offsets in the buffer:
    procedure InitializeBinding(Field: TField; var Binding: TDBBinding; var Offset: Integer);
    begin
      Binding.iOrdinal := Field.FieldNo; // column ordinal position
      Binding.wType := FieldTypeToOleDbType(Field.DataType); // column data type
      if Field.IsBlob then
        Binding.wType := Binding.wType or DBTYPE_BYREF; // pointer to external blob data
      Binding.eParamIO := DBPARAMIO_NOTPARAM;
      Binding.dwMemOwner := DBMEMOWNER_CLIENTOWNED; // we are releasing the memory
      Binding.obLength := Offset; // length field offset (starts with 0 for the first column)
      Binding.obStatus := Binding.obLength + SizeOf(DBLENGTH); // status field offset
      Binding.obValue := Binding.obStatus + SizeOf(DBSTATUS); // value offset
      Binding.dwPart := DBPART_LENGTH or DBPART_STATUS or DBPART_VALUE; // included parts
      case Field.DataType of
        ftDate:
          Binding.cbMaxLen := SizeOf(TDBDate); // OLE DB date
        ftTime:
          Binding.cbMaxLen := SizeOf(TDBTime); // OLE DB time
        ftDateTime, ftTimeStamp:
          Binding.cbMaxLen := SizeOf(TDBTimeStamp); // OLE DB timestamp
        else
          Binding.cbMaxLen := Field.DataSize;
      end;
    
      Inc(Offset, SizeOf(TColumnData) + Binding.cbMaxLen - 1); // next column's offset...
      Align(Offset); // ...aligned to 8 bytes
    end;
    
      ...
      OleDbCheck(FastLoad.QueryInterface(IID_IAccessor, Accessor), FastLoad, IID_IRowsetFastLoad);
      OleDbCheck(Accessor.CreateAccessor(DBACCESSOR_ROWDATA, Dataset.FieldCount, Bindings, BufferSize,
        AccessorHandle, StatusCodes), Accessor, IID_IAccessor);
    
    
  10. Set up the memory buffer from which the data will be copied to the table.
  11. The record buffer is a sequence of TColumnData records (of variable size):
    type
      DBLENGTH = ULONGLONG;
      PColumnData = ^TColumnData;
      TColumnData = record
        Length: DBLENGTH; // data length
        Status: DBSTATUS; // null or has a value
        Data: array[0..0] of Byte; // value data
      end;
    
    For each column, fill in the length, status and data fields within the buffer (code simplified):
    procedure GetFieldValue(Field: TField; const Binding: TDBBinding; Buffer: Pointer);
    var
      Column: PColumnData;
    begin
      Column := Pointer(NativeUInt(Buffer) + Binding.obLength);
      if Field.IsNull then
      begin
        Column^.Status := DBSTATUS_S_ISNULL;
        Column^.Length := 0;
      end
      else
      begin
        Column^.Status := DBSTATUS_S_OK;
        case Field.DataType of
          ftDate:
            with PDBDate(@Column^.Data[0])^ do
              DecodeDate(Field.AsDateTime, Word(year), month, day);
          ftTime:
            with PDBTime(@Column^.Data[0])^ do
              DecodeTime(Field.AsDateTime, hour, minute, second, MSec);
          ftDateTime, ftTimeStamp:
            with PDBTimeStamp(@Column^.Data[0])^ do
            begin
              DecodeDate(Field.AsDateTime, Word(year), month, day);
              DecodeTime(Field.AsDateTime, hour, minute, second, MSec);
              fraction := MSec * 1000000;
            end;
          else
            Field.GetData(@Column^.Data[0], False);
        end;
        case Field.DataType of
          ftString, ftMemo:
            Column^.Length := StrLen(PAnsiChar(@Column^.Data[0]));
          ftWideString, ftWideMemo:
            Column^.Length := StrLen(PWideChar(@Column^.Data[0])) * SizeOf(WideChar);
          else
            Column^.Length := Field.DataSize;
        end;
      end;
    end;
    
  12. Call IRowsetFastLoad.InsertRow to bulk copy the data in to the table.
  13. (Repeat the previous and this step for each row you want to insert.)
  14. Call IRowsetFastLoad.Commit to commit all changes.
Since you explicitly control in your code how the buffers are allocated and populated, this approach offers some ways to optimize for performance especially when inserting large amounts of data. It's also possible to set up bulk insert sessions inserting into the same table in parallel from multiple threads.
As a starting point, you can find the example source code here.

No comments: