Oracleへのデータのロード

前回の記事で、非同期リクエストを使用する場合、SNMPプロトコルを使用するデバイスのポーリング速度は毎秒9000リクエストに達することを示しました(そのような応答ストリームを形成するのに十分なデバイスがある場合)。 このデータストリームをどうするかという問題は未解決のままです。



RDBMSOracle Databaseなど)を介して監視データを処理することは一般的な方法です。 しかし、従来のリレーショナルデータベースはこの負荷を処理できますか? それを理解してみましょう。



ティンスミスの日



まず、Oracleに初めて出会った初心者がよく行うバグを解析することから始めましょう。 監視データの処理にはかなり複雑なロジックが含まれる場合がありますが、これについては後で説明します。 今のところ、次の表のデータを保存してみてください。



create sequence test_data_seq;

create table test_data (
  id             number                              not null,
  device_id      number                              not null,
  parameter_id   number                              not null,
  value          varchar2(100),
  event_date     date             default sysdate    not null
);

create unique index test_data_pk on test_data(id);

alter table test_data add
  constraint pk_test_data primary key(id);

      
      





ID , online-:



create or replace trigger test_data_bri
before insert
on test_data
for each row
begin
  select test_data_seq.nextval into :new.id from dual;
end;
/

      
      





:



!
package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";

	private Connection c = null;

	private void test() throws SQLException {
		StringBuffer sb = new StringBuffer();
		Long timestamp = System.currentTimeMillis(); 
		for (int i = 0; i < 1000; i++) {
			sb.setLength(0);
			sb.append("insert into test_data(device_id, parameter_id, value) values (");
			sb.append(i);
			sb.append(",1,'0')");
			CallableStatement st = c.prepareCall(sb.toString());
			try {
				st.execute();
			} finally {
				st.close();
			}
		}
		System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
	}

	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

      
      







, , :



614

      
      





. 9000. ?



( ):



  1. hard parse, ( , sql injection , , )
  2. , JDBC auto commit ( , )
  3. SQL PL/SQL ( , )
  4. , sequence ( , )


. , :



test_package.sql
create or replace package test_package as
    procedure addData( p_device    in  number
                     , p_parameter in  number
                     , p_value     in  number );
end test_package;
/

show errors;

create or replace package body test_package as

    procedure addData( p_device    in  number
                     , p_parameter in  number
                     , p_value     in  number ) as
    begin
      insert into test_data(id, device_id, parameter_id, value)
      values (test_data_seq.nextval, p_device, p_parameter, p_value);
    end;

end test_package;
/

show errors;

      
      







package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";

	private final static boolean AUTO_COMMIT_MODE = true;

	private final static String  ADD_DATA_SQL     = 
			"begin test_package.addData(?,?,?); end;";

	private Connection c = null;
	
	private void test() throws SQLException {
		CallableStatement st = c.prepareCall(ADD_DATA_SQL);
		try {
			Long timestamp = System.currentTimeMillis(); 
			for (int i = 0; i < 1000; i++) {
				st.setInt(1, i);
				st.setInt(2, 1);
				st.setString(3, "0");
				st.execute();
			}
			System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
		} finally {
			st.close();
		}
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
	}
	
	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

      
      







, . Oracle , SQL, PL/SQL-.



:



956

      
      





, .



, , !



? auto commit. Oracle, , ( ) ORA-01555.



, . - , . ( ) JDBC.



? , , . , — , . , .



package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";
	
	private final static boolean AUTO_COMMIT_MODE = false;

	private final static String  ADD_DATA_SQL     = 
			"begin test_package.addData(?,?,?); end;";

	private Connection c = null;
	
	private void test() throws SQLException {
		CallableStatement st = c.prepareCall(ADD_DATA_SQL);
		try {
			Long timestamp = System.currentTimeMillis(); 
			for (int i = 0; i < 1000; i++) {
				st.setInt(1, i);
				st.setInt(2, 1);
				st.setString(3, "0");
				st.execute();
			}
			c.commit();
			System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
		} finally {
			st.close();
		}
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
	}
	
	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

      
      







:



1524

      
      





, :



drop table test_data;

create global temporary table test_data (
  device_id      number                              not null,
  parameter_id   number                              not null,
  value          varchar2(100),
  event_date     date             default sysdate    not null
) on commit delete rows;

      
      





id, :



test_package.sql
create or replace package body test_package as

    procedure addData( p_device    in  number
                     , p_parameter in  number
                     , p_value     in  number ) as
    begin
      insert into test_data(device_id, parameter_id, value)
      values (p_device, p_parameter, p_value);
    end;

end test_package;
/

show errors;

      
      







REDO- ( ), :



1779

      
      





DML- , 2000. ?



! , , — , . , bulk-, .



:



Bulk
package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";
	
	private final static boolean AUTO_COMMIT_MODE = false;
	private final static int     BULK_SIZE        = 10;
	
	private final static String  ADD_DATA_SQL     = 
			"begin test_package.addData(?,?,?); end;";

	private Connection c = null;
	
	private void test() throws SQLException {
		CallableStatement st = c.prepareCall(ADD_DATA_SQL);
		try {
			int bulkSize = BULK_SIZE;
			Long timestamp = System.currentTimeMillis(); 
			for (int i = 0; i < 1000; i++) {
				st.setInt(1, i);
				st.setInt(2, 1);
				st.setString(3, "0");
				st.addBatch();
				if (--bulkSize <= 0) {
					st.executeBatch();
					bulkSize = BULK_SIZE;
				}
			}
			if (bulkSize < BULK_SIZE) {
				st.executeBatch();
			}
			c.commit();
			System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
		} finally {
			st.close();
		}
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
	}
	
	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

      
      







:



1779

      
      





. ?



, bulk- DML- (insert, update, delete). bulk- , JDBC bulk, - , .



:



Bulk insert-
package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";
	
	private final static boolean AUTO_COMMIT_MODE = false;
	private final static int     BULK_SIZE        = 10;
	
	private final static String  ADD_DATA_SQL     = 
			"insert into test_data(device_id, parameter_id, value) values (?,?,?)";
	
	private Connection c = null;
	
	private void test() throws SQLException {
		CallableStatement st = c.prepareCall(ADD_DATA_SQL);
		try {
			int bulkSize = BULK_SIZE;
			Long timestamp = System.currentTimeMillis(); 
			for (int i = 0; i < 1000; i++) {
				st.setInt(1, i);
				st.setInt(2, 1);
				st.setString(3, "0");
				st.addBatch();
				if (--bulkSize <= 0) {
					st.executeBatch();
					bulkSize = BULK_SIZE;
				}
			}
			if (bulkSize < BULK_SIZE) {
				st.executeBatch();
			}
			c.commit();
			System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
		} finally {
			st.close();
		}
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
	}
	
	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

      
      







:



12658

      
      





, ! BULK_SIZE 100, :



31250

      
      





, … commit-, .



?



. :



image



test_device test_parameter. test_state, test_history . , null.



, , test_parameter_type. , test_history:



  1. 'default' ( ) , test_history
  2. 'uptime' , test_history test_state , ( )


, , .



create table test_device (
  id             number                              not null,
  name           varchar2(100)
);

create unique index test_device_pk on test_device(id);

alter table test_device add
  constraint pk_test_device primary key(id);

create table test_parameter_type (
  id             number                              not null,
  name           varchar2(100)
);

create unique index test_parameter_type_pk on test_parameter_type(id);

alter table test_parameter_type add
  constraint pk_test_parameter_type primary key(id);

create table test_parameter (
  id             number                              not null,
  type_id        number                              not null,
  name           varchar2(100)
);

create unique index test_parameter_pk on test_parameter(id);

create index test_parameter_fk on test_parameter(type_id);

alter table test_parameter add
  constraint pk_test_parameter primary key(id);

alter table test_parameter add
  constraint fk_test_parameter foreign key (type_id) 
    references test_parameter_type(id);

create table test_state (
  device_id      number                              not null,
  parameter_id   number                              not null,
  value          varchar2(100)                       not null,
  last_date      date             default sysdate    not null
);

create unique index test_state_pk on test_state(device_id, parameter_id);

create index test_state_fk on test_state(parameter_id);

alter table test_state add
  constraint pk_test_state primary key(device_id, parameter_id);

alter table test_state add
  constraint fk_test_state foreign key (device_id) 
    references test_device(id);

alter table test_state add
  constraint fk_test_state_parameter foreign key (parameter_id) 
    references test_parameter(id);

create sequence test_history_seq;

create table test_history (
  id             number                              not null,
  device_id      number                              not null,
  parameter_id   number                              not null,
  value          varchar2(100)                       not null,
  event_date     date             default sysdate    not null
);

create unique index test_history_pk on test_history(id);

create index test_history_device_fk on test_history(device_id);

create index test_history_parameter_fk on test_history(parameter_id);

alter table test_history add
  constraint pk_test_history primary key(id);

alter table test_history add
  constraint fk_test_history_device foreign key (device_id) 
    references test_device(id);

alter table test_history add
  constraint fk_test_history_parameter foreign key (parameter_id) 
    references test_parameter(id);

      
      







Insert into TEST_PARAMETER_TYPE
   (ID, NAME)
 Values
   (1, 'default');
Insert into TEST_PARAMETER_TYPE
   (ID, NAME)
 Values
   (2, 'uptime');
COMMIT;

Insert into TEST_PARAMETER
   (ID, TYPE_ID, NAME)
 Values
   (1, 1, 'status');
Insert into TEST_PARAMETER
   (ID, TYPE_ID, NAME)
 Values
   (2, 2, 'uptime');
COMMIT;

insert into test_device(id, name)
select rownum, object_name
from   all_objects;

commit;

      
      







, :



CREATE OR REPLACE package test_package as
    procedure saveData;                     
end test_package;
/

show errors;

CREATE OR REPLACE package body test_package as

    procedure saveData as
    begin
    
      --  history    default
      insert into test_history(id, device_id, parameter_id, value, event_date)
      select test_history_seq.nextval, a.device_id, a.parameter_id, a.value, a.event_date
      from   test_data a
      inner  join test_device b on ( b.id = a.device_id )
      inner  join test_parameter c on ( c.id = a.parameter_id )
      inner  join test_parameter_type d on ( d.id = c.type_id and d.name = 'default' )
      left   join test_state e on ( e.device_id = a.device_id and 
                                    e.parameter_id = a.parameter_id )
      where  e.value is null or e.value <> a.value;
      
      --  history    uptime
      insert into test_history(id, device_id, parameter_id, value, event_date)
      select test_history_seq.nextval, a.device_id, a.parameter_id, e.value, e.last_date
      from   test_data a
      inner  join test_device b on ( b.id = a.device_id )
      inner  join test_parameter c on ( c.id = a.parameter_id )
      inner  join test_parameter_type d on ( d.id = c.type_id and d.name = 'uptime' )
      left   join test_state e on ( e.device_id = a.device_id and 
                                    e.parameter_id = a.parameter_id )
      where  e.value > a.value;
      
      --  state
      merge into test_state a
      using ( select c.device_id, c.parameter_id, c.value, c.event_date
              from   test_data c
              inner  join test_device d on ( d.id = c.device_id )
              inner  join test_parameter e on ( e.id = c.parameter_id )
            ) b
      on ( b.device_id = a.device_id and b.parameter_id = a.parameter_id )
      when matched then
        update set a.value = b.value, a.last_date = b.event_date
      when not matched then
        insert (device_id, parameter_id, value, last_date)
        values (b.device_id, b.parameter_id, b.value, b.event_date);
    
      --  ,   
      commit;
    
    end;

end test_package;
/

show errors;

      
      







, , , Oracle .



:



package com.acme.ae.tests.jdbc;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {
	
	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";
	
	private final static boolean AUTO_COMMIT_MODE = false;
	private final static int     BULK_SIZE        = 100;
	
	private final static String  ADD_DATA_SQL     = 
			"insert into test_data(device_id, parameter_id, value) values (?,?,?)";

	private final static String  SAVE_DATA_SQL    = 
			"begin test_package.saveData; end;";
	
	private Connection c = null;
	
	private void test() throws SQLException {
		Long timestamp = System.currentTimeMillis(); 
		CallableStatement st = c.prepareCall(ADD_DATA_SQL);
		try {
			int bulkSize = BULK_SIZE;
			for (int i = 0; i < 1000; i++) {
				st.setInt(1, i);
				st.setInt(2, 1);
				st.setString(3, "0");
				st.addBatch();
				if (--bulkSize <= 0) {
					st.executeBatch();
					bulkSize = BULK_SIZE;
				}
			}
			if (bulkSize < BULK_SIZE) {
				st.executeBatch();
			}
		} finally {
			st.close();
		}
		st = c.prepareCall(SAVE_DATA_SQL);
		try {
			st.execute();
		} finally {
			st.close();
		}
		System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
	}

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
	}
	
	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test();
			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

      
      







:



5319

      
      





, . . 5000 , , 600. , Oracle ( ).



, InMemory DB. .



?



, , . , , .



InMemory NoSQL DB . ( REDO- Oracle TimesTen), ( ).



, — ? . , , , , .






All Articles