by-nc-sa     개발자, DBA가 함께 만들어가는 구루비 지식창고!

4장 클라이언트 API 고급 기능 (4.2 ~ 4.5)




4.2카운터

온라인광고 클릭수 페이지 보기 수 등
속도가 느린 일괄처리 문제를 실시간 환경으로 전환할 수 있는 가능성 제공

4.2.1 카운터 소개

직접 row를 잠그고, 읽고, 증가, 락해제 => 부하가 많은데,
HBase는 check-and-modify 뿐만아니라 Counter기능을 제공한다.

명령어

incr '<table>', '<row>', '<column>', [<increment-value>]

  • 기본 사용
    hbase(main):001:0> create 'counters', 'daily', 'weekly', 'monthly'
    0 row(s) in 1.1930 seconds
    
    hbase(main):002:0> incr 'counters', '20110101', 'daily:hits', 1
    COUNTER VALUE = 1
    
    hbase(main):003:0> incr 'counters', '20110101', 'daily:hits', 1
    COUNTER VALUE = 2
    
    hbase(main):04:0> get_counter 'counters', '20110101', 'daily:hits'
    COUNTER VALUE = 2
    
  • 잘못된 사용
    hbase(main):001:0> put 'counters', '20110101', 'daily:clicks', '1'
    0 row(s) in 0.0540 seconds
    
    hbase(main):013:0> incr 'counters', '20110101', 'daily:clicks', 1 
    COUNTER VALUE = 3530822107858468865
    
  • 또 다른 사용
    hbase(main):004:0> incr 'counters', '20110101',
    'daily:hits'  
    COUNTER VALUE = 3
    
    hbase(main):005:0> incr 'counters', '20110101', 'daily:hits'
    COUNTER VALUE = 4
    
    hbase(main):006:0> incr 'counters', '20110101', 'daily:hits', 0
    COUNTER VALUE = 4
    
    hbase(main):007:0> incr 'counters', '20110101', 'daily:hits', -1
    COUNTER VALUE = 3
    
    hbase(main):008:0> incr 'counters', '20110101', 'daily:hits', -1
    COUNTER VALUE = 2
    

    4.2.2 단일 카운터

    HTable table = new HTable(conf, "counters");
    long cnt1 = table.incrementColumnValue(Bytes.toBytes("20110101"), 
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);
    long cnt2 = table.incrementColumnValue(Bytes.toBytes("20110101"), 
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);
    long current = table.incrementColumnValue(Bytes.toBytes("20110101"), 
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), 0);
    long cnt3 = table.incrementColumnValue(Bytes.toBytes("20110101"), 
      Bytes.toBytes("daily"), Bytes.toBytes("hits"), -1);
    
    // 결과 : cnt1: 1, cnt2: 2, current: 2, cnt3: 1
    

4.2.3 복수 카운터

Example 4-18. Incrementing multiple counters in one row

    Increment increment1 = new Increment(Bytes.toBytes("20110101"));
    increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 1);
   increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1); 
   increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 10);
   increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), 10);
   Result result1 = table.increment(increment1); 
   for (KeyValue kv : result1.raw()) {
     System.out.println("KV: " + kv +
       " Value: " + Bytes.toLong(kv.getValue())); 
   }
   Increment increment2 = new Increment(Bytes.toBytes("20110101"));
   increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 5);
   increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1); 
   increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 0);
   increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), -5);
   Result result2 = table.increment(increment2);
   for (KeyValue kv : result2.raw()) {
     System.out.println("KV: " + kv +
       " Value: " + Bytes.toLong(kv.getValue()));
   }


KV: 20110101/daily:clicks/1301948275827/Put/vlen=8 Value: 1
KV: 20110101/daily:hits/1301948275827/Put/vlen=8 Value: 1
KV: 20110101/weekly:clicks/1301948275827/Put/vlen=8 Value: 10
KV: 20110101/weekly:hits/1301948275827/Put/vlen=8 Value: 10

KV: 20110101/daily:clicks/1301948275829/Put/vlen=8 Value: 6
KV: 20110101/daily:hits/1301948275829/Put/vlen=8 Value: 2
KV: 20110101/weekly:clicks/1301948275829/Put/vlen=8 Value: 10
KV: 20110101/weekly:hits/1301948275829/Put/vlen=8 Value: 5

4.3 보조 처리기

계산 작업 일부를 데이터가 저장된 서버에 전가.

4.3.1 보조처리기 소개

작업을 전체 클러스터에 분산하는 작은 맵리듀스 프레임워크.
임의의 코드를 각 리전 서버에서 직접 실행하게 해준다.
사용자의 클래스를 사용할 수 있게 jar를 서버에 제공해야 한다. (동적로딩)
용도 : 보조색인, 참조무결성 구현, stateful 한 필터, sql의 sum() avg() 집계함수 구현

HBase 0.92에서 접근제어가 보조처리기로 구현됨

  • 옵저버
    이 유형의 보조처리기는 트리거와 비교된다. 특정이벤트 발생하면 callback method(hook)가 실행됨.

RegionObserver
data manipulation events (regions of a table.)
MasterObserver
administrative or DDL-type operations. (cluster-wide events.)
WALObserver
WAL( write-ahead log processing.)

  • 엔드포인트
    RDB의 stored procedure와 같은 개념.
    옵저버 구현체와 결합할 수 있다.

4.3.2 Coprocessor 인터페이스

모든 보조처리기가 구현해야한다.

Table 4-8. Priorities as defined by the Coprocessor.Priority enumeration

Value Description
SYSTEM Highest priority, defines coprocessors that are executed first
USER Defines all other coprocessors, which are executed subsequently

보조처리기의 생명주기는 프레임웍이 관리. 아래 두 메소드를 제공.

void start(CoprocessorEnvironment env) throws IOException;
void stop(CoprocessorEnvironment env) throws IOException;

*보조처리기는 row에 락을 걸 수 없다.

Coprocessor.State 상태값

설명
UNINSTALLED 보조처리기가 최초 상태에 있음. 아직 환경을 갖지 않았으며 초기화되지 않았음.
INSTALLED 인스턴스가 환경 안에 설치되었음
STARTING 보조처리기 시작 직전상태. 즉 보조 처리기의 start()메소드가 실행되기 직전 상태
ACTIVE start()메소드 호출에 대한 응답이 반환된 상태
STOPPING stop()메소드가 실행되기 직전 상태
STOPPED stop()메소드 호출에 대한 응답이 반환된 상태

4.3.3 보조 처리기 로드

설정파일에 의한 로드

hbase-site.xml에 추가

<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>coprocessor.RegionObserverExample, coprocessor.AnotherCoprocessor</value>
</property>
<property>
  <name>hbase.coprocessor.master.classes</name>
  <value>coprocessor.MasterObserverExample</value>
</property>
<property>
  <name>hbase.coprocessor.wal.classes</name>
  <value>coprocessor.WALObserverExample, bar.foo.MyWALObserver</value>
</property>

모두 SYSTEM 우선순위

table descriptor에 의한 로드

해당 테이블이 속한 리전,리전서버에만 로드됨.

 public class LoadWithTableDescriptorExample {
  public static void main(String[] args) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    FileSystem fs = FileSystem.get(conf);
    Path path = new Path(fs.getUri() + Path.SEPARATOR + "test.jar"); 
    HTableDescriptor htd = new HTableDescriptor("testtable"); 
    htd.addFamily(new HColumnDescriptor("colfam1"));
    htd.setValue("COPROCESSOR$1", path.toString() +
      "|" + RegionObserverExample.class.getCanonicalName() + 
      "|" + Coprocessor.Priority.USER);
    HBaseAdmin admin = new HBaseAdmin(conf); 
    admin.createTable(htd);
    System.out.println(admin.getTableDescriptor(Bytes.toBytes("testtable"))); 
  }
}

{NAME => 'testtable', COPROCESSOR$1 => \
  'file:/test.jar|coprocessor.RegionObserverExample|USER', FAMILIES => \
  [{NAME => 'colfam1', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', \
  COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE \
  => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}

$뒤에 우선순위, hdfs에 jar지정가능

'COPROCESSOR$1' => \
  'hdfs://localhost:8020/users/leon/test.jar|coprocessor.Test|SYSTEM'
'COPROCESSOR$2' => \
  '/Users/laura/test2.jar|coprocessor.AnotherTest|USER'

4.3.4 RegionObserver 클래스

hook 발생동작 2가지 : 리전 생명주기 변경동작, 클라이언트 api호출동작

리전 생명주기 이벤트 처리


각 상태별 pre,post 메소드가 있음.

클라이언트 API 이벤트 처리

클라이언트에서 리전 서버로 전달됨. 이 API 호출 직전, 직후에 hook.

void preGet(...) / void postGet(...)
void prePut(...) / void postPut(...)
void preDelete(...) / void postDelete(...)
boolean preCheckAndDelete(...) / boolean postCheckAndDelete(...)
void preGetClosestRowBefore(...) / void postGetClosestRowBefore(...)
boolean preExists(...) / boolean postExists(...)
long preIncrementColumnValue(...) / long postIncrementColumnValue(...)
void preIncrement(...) / void postIncrement(...)
InternalScanner preScannerOpen(...) / InternalScanner postScannerOpen(...)
boolean preScannerNext(...) / boolean postScannerNext(...)
void preScannerClose(...) / void postScannerClose(...)

공유 정보 클래스 => RegionCoprocessorEnvironment클래스
상황정보 => ObserverContext 클래스

상황(context) : 모든 보조처리기에 대해 동일 ObserverContext
환경(environment) : 보조처리기 마다 교체 ObserverContext . E getEnvironment()

bybass, complete 는 서버측 프로세스에 영향

사용자 정의 보조 처리기의 기본 클래스 => BaseRegionObserver 클래스

4.3.5 MasterObserver 클래스

DDL연산의 hook

void preCreateTable(...) / void postCreateTable(...)
void preDeleteTable(...) / void postDeleteTable(...)
void preModifyTable(...) / void postModifyTable(...)
void preAddColumn(...) / void postAddColumn(...)
void preModifyColumn(...) / void postModifyColumn(...)
void preDeleteColumn(...) / void postDeleteColumn(...)
void preEnableTable(...) / void postEnableTable(...)
void preDisableTable(...) / void postDisableTable(...)
void preMove(...) / void postMove(...)
void preAssign(...) / void postAssign(...)
void preUnassign(...) / void postUnassign(...)
void preBalance(...) / void postBalance(...)
boolean preBalanceSwitch(...) / void postBalanceSwitch(...)
void preShutdown(...)
void preStopMaster(...)

MasterCoprocessorEnvironment 클래스
보조처리기의 환경 getTable등도 가능

BasMasterObserver 클래스
사용자 정의 기본 클래스

4.3.6 엔드포인트 보조 처리기

집계함수는 위의 기능으로 구현 불가. 여기서 구현해보자.

CoprocessorProtocal 인터페이스

클라이언트에게 사용자 정의 RPC프로토콜을 제공하기 위해 CoprocessorProtocal을 상속
HTable이 제공하는 아래메소드를 통해 보조 처리기 인스턴스와 통신가능

<T extends CoprocessorProtocol> T coprocessorProxy(
  Class<T> protocol, byte[] row) //단일리전
<T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
  Class<T> protocol, byte[] startKey, byte[] endKey,
  Batch.Call<T,R> callable) //특정범위리전
<T extends CoprocessorProtocol, R> void coprocessorExec(
  Class<T> protocol, byte[] startKey, byte[] endKey,
  Batch.Call<T,R> callable, Batch.Callback<R> callback) //callback은 옵션

이 인스턴스는 테이블 내의 개별 리전과 연동되므로 row키를 통해 리전의 이름을 파악한다.
callable이 각 리전에서 수행된다. callback은 옵션

BaseEndpointCoprocessor 클래스

프로토콜 정의

public interface RowCountProtocol extends CoprocessorProtocol {
  long getRowCount() throws IOException;
  long getRowCount(Filter filter) throws IOException;
  long getKeyValueCount() throws IOException;
}

엔드포인트 코프로세서 구현

public class RowCountEndpoint extends BaseEndpointCoprocessor
  implements RowCountProtocol {
  private long getCount(Filter filter, boolean countKeyValues)
    throws IOException {
    Scan scan = new Scan();
    scan.setMaxVersions(1);
    if (filter != null) {
      scan.setFilter(filter);
    }
    RegionCoprocessorEnvironment environment =
      (RegionCoprocessorEnvironment) getEnvironment();
    // use an internal scanner to perform scanning.
    InternalScanner scanner = environment.getRegion().getScanner(scan);
    int result = 0;
    try {
      List<KeyValue> curVals = new ArrayList<KeyValue>();
      boolean done = false;
      do {
        curVals.clear();
        done = scanner.next(curVals);
        result += countKeyValues ? curVals.size() : 1;
      } while (done);
    } finally {
      scanner.close();
    }
    return result;
  }
  @Override
  public long getRowCount() throws IOException {
    return getRowCount(new FirstKeyOnlyFilter());
  }
  @Override
  public long getRowCount(Filter filter) throws IOException {
    return getCount(filter, false);
  }
  @Override
  public long getKeyValueCount() throws IOException {
    return getCount(null, true);
  }
}

보조처리기 사용

public class EndpointExample {
  public static void main(String[] args) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    HTable table = new HTable(conf, "testtable");
    try {
      Map<byte[], Long> results = table.coprocessorExec(
        RowCountProtocol.class, 
        null, null, 
        new Batch.Call<RowCountProtocol, Long>() { 
          @Override
          public Long call(RowCountProtocol counter) throws IOException {
            return counter.getRowCount(); 
          }
        });
      long total = 0;
      for (Map.Entry<byte[], Long> entry : results.entrySet()) { 
        total += entry.getValue().longValue();
        System.out.println("Region: " + Bytes.toString(entry.getKey()) +
          ", Count: " + entry.getValue());
      }
      System.out.println("Total Count: " + total);
    } catch (Throwable throwable) {
      throwable.printStackTrace();
    }
  }
}

기타 고급기능 책 참조

4.4 HTablePool

HTable 인스턴스 생성한 후 재사용이 합리적, 생성하는데 몇 초가 걸리기 때문.
HTable은 thread-safe하지 않아 스레드 여러개가 사용하는건 다른 문제다.
재사용문제는 HTablePool을 이용해라.

Configuration conf = HBaseConfiguration.create()
HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE)
Configuration conf = HBaseConfiguration.create();
HTablePool pool = new HTablePool(conf, 5); 
HTableInterface[] tables = new HTableInterface[10];
for (int n = 0; n < 10; n++) {
      tables[n] = pool.getTable("testtable"); 
      System.out.println(Bytes.toString(tables[n].getTableName()));
}
for (int n = 0; n < 5; n++) {
    pool.putTable(tables[n]); 
}
pool.closeTablePool("testtable");

Acquiring tables...
testtable
testtable
testtable
testtable
testtable
testtable
testtable
testtable
testtable
testtable
Releasing tables...
Closing pool...

4.5 연결 처리

HTable내의 연결 HConnection.
동일한 Configuration이면, 동일한 HConnection 인스턴스를 HBase가 할당.
왜냐면?

  • 주키퍼 연결 공유를 위해
  • 공통자원캐싱
  • HTablePool . 즉, HTable인스턴스 공유
    HTable table1 = new HTable("table1");
    //...
    HTable table2 = new HTable("table2");
    
    //위에보다 아래가 효율적
    
    Configuration conf = HBaseConfiguration.create();
    HTable table1 = new HTable(conf, "table1");
    //...
    HTable table2 = new HTable(conf, "table2");
    

맵리듀스 잡 처럼 매우 분산된 애플리케이션에서는 HTable close()하지 않으면 IOException을 받을 수도 있다.

문서정보

Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.