通过eclipse方法来操作Hadoop集群上cassandra数据库(包括创建Keyspace对象以及往数据库写入数据)
(1)下载cassandra,我所用版本为apache-cassandra-2.0.13-bin.tar.gz(hadoop版本为1.0.1),将其上传到hadoop集群,然后解压,tar -xzf apache-cassandra-2.0.13-bin.tar.gz; 并改名为 cassandra,放在目录/usr/下面,然后修改几个文件:
vim cassandra.yaml 按照下面的字段修改
data_file_directories:
- /usr/cassandra/data
# commit log
commitlog_directory: /usr/cassandra/commitlog
# saved caches
saved_caches_directory: /usr/cassandra/saved_caches
# multiple nodes!
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
# seeds is actually a comma-delimited list of addresses.
# Ex: "<ip1>,<ip2>,<ip3>"
- seeds: "172.16.2.42,172.16.2.34,172.16.2.54,172.16.2.57"//集群上所有主机的ip地址
# Setting this to 0.0.0.0 is always wrong.
listen_address: 172.16.2.42 //对应主机的ip地址;其他主机要改为对应的IP地址
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
rpc_address: 0.0.0.0 //所有机器都为这个
auto_bootstrap: true
接着修改文件:
vim log4j-server.properties
log4j.appender.R.File=/usr/cassandra/system.log
和:
vim cassandra-topology.properties
172.16.2.42=DC1:RAC2
172.16.2.34=DC1:RAC2
172.16.2.54=DC1:RAC2
172.16.2.57=DC1:RAC2
配置好了,然后通过scp的方式拷贝到其他hadoop集群上的所有机器(注意:黄色背景字体处的IP地址必须修改与对应主机一样的IP地址)。
(2)配置classpath,在hadoop集群上的所有机器都必须进行配置,如下:
Vim /etc/profile
#set cassandra path
export CASSANDRA_HOME=/usr/cassandra
export PATH=${CASSANDRA_HOME}/bin:${PATH}
export CLASSPATH=.:$CASSANDRA_HOME/lib/*.jar:$CLASSPATH
(3)启动cassandra:输入下面命令即可
[hadoop@Masterpc ~]$ cassandra
启动成功的话,会出现下面所所示,表明成功!
[hadoop@Masterpc ~]$ jps
3575 CassandraDaemon
(4)创建map/reduce工程,命名为CassandraPro,需要把cassandra的lib目录下的jar包拷贝到hadoop的lib目录下;然后创建class,命名为TestClient,其代码如下:
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
public class TestClient
{
public static void main(String[] args)
throws TException, InvalidRequestException,
UnavailableException, UnsupportedEncodingException,
NotFoundException, TimedOutException
{
//包装好的socket
//TTransport tr = new TFramedTransport(new TSocket("Masterpc.Hadoop",9160));
//Masterpc.Hadoop : 172.16.2.42
// 也可以连接到集群上其他的主机,只需要连接主机的cassandra客户端开启了即可
TTransport tr = new TFramedTransport(new TSocket("172.16.2.42",9160)); //既可以用IP地址也可以用主机名
TProtocol proto = new TBinaryProtocol(tr);
Cassandra.Client client = new Cassandra.Client(proto);
tr.open();
if(!tr.isOpen())
{
System.out.println("failed to connect server!");
return;
}
long temp = System.currentTimeMillis();
client.set_keyspace("demo1");//使用DEMO keyspace
ColumnParent parent = new ColumnParent("student");//column family
/*
* 这里我们插入10000 条数据到Student内, 每条数据包括id和name
*/
String key_user_id = "a";
for(int i = 0;i < 10000;i++)
{
String k = key_user_id + i; //key
long timestamp = System.currentTimeMillis();//时间戳
Column idColumn = new Column(toByteBuffer("id"));//column name
idColumn.setValue(toByteBuffer(i + ""));//column value
idColumn.setTimestamp(timestamp);
client.insert(
toByteBuffer(k),
parent,
idColumn,
ConsistencyLevel.ONE);
Column nameColumn = new Column(toByteBuffer("name"));
nameColumn.setValue(toByteBuffer("student" + i));
nameColumn.setTimestamp(timestamp);
client.insert(
toByteBuffer(k),
parent,
nameColumn,
ConsistencyLevel.ONE);
}
/*
* 读取某条数据的单个字段
*/
ColumnPath path = new ColumnPath("student");//设置读取Student的数据
path.setColumn(toByteBuffer("id")); //读取id
String key3 = "a9999";//读取key为a1的那条记录
System.out.println(toString(client.get(toByteBuffer(key3), path, ConsistencyLevel.ONE).column.value));
/*
* 读取整条数据
*/
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange(toByteBuffer(""), toByteBuffer(""), false, 10);
predicate.setSlice_range(sliceRange);
List<ColumnOrSuperColumn> results =
client.get_slice(toByteBuffer(key3), parent, predicate, ConsistencyLevel.ONE);
for (ColumnOrSuperColumn result : results)
{
Column column = result.column;
System.out.println(toString(column.name) + " -> " + toString(column.value));
}
long temp2 = System.currentTimeMillis();
System.out.println("time: " + (temp2 - temp) + " ms");//输出耗费时间
tr.close();
}
/*
* 将String转换为bytebuffer,以便插入cassandra
*/
public static ByteBuffer toByteBuffer(String value)
throws UnsupportedEncodingException
{
return ByteBuffer.wrap(value.getBytes("UTF-8"));
}
/*
* 将bytebuffer转换为String
*/
public static String toString(ByteBuffer buffer)
throws UnsupportedEncodingException
{
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes, "UTF-8");
}
}
(5)启动需要连接主机(如:172.16.2.42)的cassandra数据库客户端
[hadoop@Masterpc ~]$ cassandra
启动成功的话,会出现下面所所示,表明成功!
[hadoop@Masterpc ~]$ jps
3575 CassandraDaemon
(6)通过节点的ip地址和端口连接到一个多节点群集中的任意一个节点:
cassandra-cli -host 172.16.2.42 -port 9160
然后创建一个 Keyspace对象
create keyspace demo1
with placement_strategy = ‘org.apache.cassandra.locator.SimpleStrategy‘
and strategy_options = [{replication_factor:1}];
接着创建一个列族,列族包括两个列:id 和 name
[default@unknown] use demo1;
[default@demo1] CREATE COLUMN FAMILY student
WITH comparator = UTF8Type
AND key_validation_class=UTF8Type
AND column_metadata = [
{column_name: id, validation_class:IntegerType}
{column_name: name, validation_class:UTF8Type}
];
(7)然后运行程序(注意:所连接的主机的cassandra客户端必须开启了),结果如下:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/HadoopWorkPlat/hadoop/lib/slf4j-log4j12-1.7.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/HadoopWorkPlat/hadoop/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/HadoopWorkPlat/hadoop/lib/slf4j-log4j12-1.4.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/wd/Desktop/hadoop/apache-cassandra-2.0.13-bin/apache-cassandra-2.0.13/lib/slf4j-log4j12-1.7.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
9999
id -> 9999
name -> student9999
time: 36163 ms
(8)查看cassandra数据库:
[default@demo1] list student;
Using default limit of 100
Using default cell limit of 100
-------------------
RowKey: a4
=> (name=id, value=52, timestamp=1427518050680)
=> (name=name, value=student4, timestamp=1427518050680)
-------------------
RowKey: a12
=> (name=id, value=12594, timestamp=1427518050715)
=> (name=name, value=student12, timestamp=1427518050715)
-------------------
RowKey: a54
=> (name=id, value=13620, timestamp=1427518050921)
=> (name=name, value=student54, timestamp=1427518050921)
-------------------
。。。。。。(省)
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。