淘宝开源项目TbSchedule的部署和使用
tbschedule项目其实可以分为两部分:
- schedule管理控制台。负责控制、监控任务执行状态
- 实际执行job的客户端程序。
在实际使用时,首先要启动zookeeper, 然后部署tbschedule web界面的管理控制台,最后启动实际执行job的客户机器。这里zookeeper并不实际控制任务调度,它只是负责与N台执行job的客户端通讯,协调、管理、监控这些机器的运行信息。实际分配任务的是tbschedule管理控制台,控制台从zookeeper获取job的运行信息。
tbSchedule通过控制ZNode
的创建、修改、删除来间接控制Job的执行,执行Job的客户端会监听它们对应ZNode
的状态更新事件,从而达到通过tbSchedule控制Job执行的目的。
部署zookeeper
去http://zookeeper.apache.org/releases.html#download下载最新稳定版本。下载完成后解压,将 /conf
目录下的XXX.cfg
更名为zoo.cfg
,因为zookeeper启动时会在这个目录下找zoo.cfg
读取配置信息。这个文件里有几个重要的参数需要说明一下:
tickTime=2000
- 定义时间计量单位。这里表示一个
tick
为2秒。以后在配置时间相关的东西时,都是以tick
为单位的。
- 定义时间计量单位。这里表示一个
dataDir=/tmp
- 定义快照(snapshot)文件的存储位置。zookeeper会将节点信息定时写入到这个目录中。这个目录必须存在,否则启动时会报错。
clientPort=2181
- 指定客户端连接端口。 zookeeper会在这个端口监听连接请求。
server.1=127.0.0.1:2000:3000
- 这个参数仅在集群部署时起作用。格式为:
server.id=host:port:port
。id
表示服务器的唯一标识,一般从1开始计数。第一个port
表示zookeeper集群机器之间的通讯端口,第二个port
表示当集群机器在选举leader
时使用的通讯端口。只有当集群第一次启动,或master
机崩溃时,才会进行leader
选举。
- 这个参数仅在集群部署时起作用。格式为:
配置完成后,切换到/bin
目录,执行:
./zkServer.sh start
即可启动zookeeper,默认会在后台运行,如果想在前端运行,需要执行:
./zkServer.sh start-foreground
Zookeeper集群部署
集群部署时,除了需要指定zoo.cfg
中server.X:XXXX:XX:XX
参数外,还要在每台机器的dataDir
目录下创建一个名为myid
的文件,内容为当前机器的标识数字,与server.X
中的X
相同。完成配置后,依次启动每个zookeeper即可。
注意,当你在启动第一个zookeeper时控制台会大量报错,这是因为其它的zookeeper还没有启动。无视即可。
tbSchedule控制台部署
tbSchedule就是个用servlet/JSP
写的web项目,我们可以直接把war
包部署到tomcat
中,然后在浏览器访问
http://localhost:8080/ScheduleConsole
即可。
如果你想手动编译、构建项目而不是使用war
包,要小心一个坑,那就是执行
mvn clean install -Dmaven.test.skip=true
时maven
会报找不到构件的错误。查其原因,是因为这个项目太老了,当时是用maven2构建的,项目中用到的依赖版本也比较老了,而且它们所在的repository已经停用了,因此无法自动下载。解决方法,直接exclusion
缺少的依赖即可:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.3</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
第一次访问控制台时会出现以下配置页面:
第一行指定zookeeper的地址、端口,第二行是超时时间。用户名和密码在这里没有任何用处,无视即可。要注意的是第三行Zookeeper的根目录,这并不是指的部署zookeeper时指定的dataDir
,而是一个你自己指定的、与当前管理控制台在同一个机器上的目录,tbSchedule管理控制台会将任务的配置信息(如执行开始时间,调度策略)保存到该目录下,这样下次启动管理控制台时就可以直接从目录中读取配置信息了。
填写完成后点保存
,此时上面会出现一行红字,无视之。直接点击管理主页
即可进入管理页面:
至此tbSchedule控制台部署完毕。
tbSchedule客户端编写
tbSchedule项目的test/
目录下有很多测试类,可以执行
mvn test
把测试跑一遍。跑之前要修改项目中schedule.xml
文件正确填写zookeeper的连接地址。测试跑通则说明tbSchedule管理控制台和zookeeper都部署无误。
当我们要执行一个job时,需要创建新项目,引入tbschedule
依赖,实现指定接口,然后打成jar
包,通过
java -jar 你的jar名.jar
启动job。依赖如下:
<groupId>com.taobao.pamirs.schedule</groupId>
<artifactId>tbschedule</artifactId>
<version>3.2.16</version>
同时别忘了集成spring。
Main方法代码如下:
public static void main(String[] args) throws Exception {
ApplicationContext ctx = new ClassPathXmlApplicationContext(
"spring-config.xml");
TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();
Properties p = new Properties();
p.put("zkConnectString", "192.168.3.117:2181");
p.put("rootPath", "/home/platform/data");
p.put("zkSessionTimeout", "60000");
p.put("userName", "ScheduleAdmin");
p.put("password", "password");
p.put("isCheckParentPath", "true");
scheduleManagerFactory.setApplicationContext(ctx);
scheduleManagerFactory.init(p);
scheduleManagerFactory.setZkConfig(convert(p));
}
执行这个方法后,你的程序就会向zookeeper发起连接,注册当前机器,请求任务队列,最后根据调度配置执行job。
job的执行代码需要配置成一个spring bean
:
<bean id="testTaskBean" class="com.anzhi.schedule.task.TestTaskBean" >
<property name="jdbcTemplate" ref="jdbcTemplate" />
</bean>
这个bean的骨架如下:
public class TestTaskBean implements IScheduleTaskDealSingle<PassportModel> {
/**
* 选择任务. 从DB中读取数据, 将取出的数据返回
* @param taskParameter
* @param ownSign
* @param taskItemNum
* @param taskItemList
* @param eachFetchDataNum
* @return
* @throws Exception
*/
public List<PassportModel> selectTasks(String taskParameter, String ownSign,
int taskItemNum, List<TaskItemDefine> taskItemList,
int eachFetchDataNum) throws Exception {
List<PassportModel> list = new ArrayList<PassportModel>(1);
list.add(new PassportModel());
return list;
}
/**
* 向目标表中插入数据
* @param model
* @param ownSign
* @return
* @throws Exception
*/
public boolean execute(PassportModel model, String ownSign)
throws Exception {
try {
//insertData(model);
System.out.println("执行任务");
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}
其中泛型参数Passport
是我们自定义的类。
tbSchedule的调用流程为:
- 执行
selectTasks()
方法,该方法返回一个List
对象,表示你选择出的任务列表。 - 执行
execute()
方法,tbschedule会遍历你在selectTasks()
方法中返回的List
,然后对每一个元素都调用execute()
方法。
任务调度的配置
进入tbSchedule管理控制台,创建一个新策略:
注意,任务名称格式为创建的任务名$自定义字符串
。其中你自定义的字符串会被传递到selectTasks()
方法中的ownSign
参数中。
任务管理 -> 创建新任务:
这是对单个job的调度配置信息。
重要参数说明:
- 任务名称
- 就是刚才在配置策略时填写的名称(
$
之前的部分)。
- 就是刚才在配置策略时填写的名称(
- 处理模式
SLEEP
模式
- 当某一个线程任务处理完毕,从任务池中取不到任务的时候,检查其它线程是否处于活动状态。如果是,则自己休眠; 如果其它线程都已经因为没有任务进入休眠,当前线程是最后一个活动线程的时候,就调用业务接口,获取需要处理的任务,放入任务池中, 同时唤醒其它休眠线程开始工作。
NOTSLEEP
模式
- 当一个线程任务处理完毕,从任务池中取不到任务的时候,立即调用业务接口获取需要处理的任务,放入任务池中。
- 任务项
- 一个线程组(会有多个线程)只执行一个任务。
- 一个任务项只能由一个任务处理器执行
- 任务处理器是一个逻辑性的概念,一个任务处理器只有一个线程组。
- 所以可以把任务划分为1,2,3,4,5,6,7,8, 9, 10一共10个任务碎片,10个任务碎片会被分配到10(刚才在
创建调度策略
中配置的)个线程组,那么每个线程组对应1个任务碎片,运行时任务项参数又被传递到bean任务类selectTasks方法的List queryCondition参数,例如第1个线程组调用selectTasks方法是queryCondition参数条件为1 ,第2个线程组执行参数条件为2。 我们需要在方法中自己解析这个数值,根据值的不同执行不同部分的任务。因为一个线程组会有多个线程,因此可以实现并行计算。
执行job
完成以上工作后,运行编写的job客户端,job即可被调度执行。