javafx并发初步
javafx.concurrency并发包是为方便javafx Application Thread也就是javafx的UI线程与后台线程安全交互的工具包。
接口:Worker.
Worker接口封装了Worker.State枚举作为Worker的状态:READY,SCHEDULED,RUNNING,SUCCEDED,FAILED,CANCELLED.
Worker拥有下面这些只读property.
Worker同时有这些状态对应的Proeprt那就是state,以便于JavaFX Application Thread可以进行observable.
Worker还拥有除了状态对应的property之外几个个有用的property:progress,totalWork,message,title,value,exception,running, workDone
Woker有三个直接实现:Task与Service,ScheduledService,根据不同的实现决定Worker的可复用性,Task是不可以重用的,只能被执行一次,而Service可以重复执行。
类:Task,Service, ScheduledService,他们都实现了EventTarget接口,并拥有addEventHandler()方法,可以检测通过WorkerStateEvent对象进行检测状态的变化。当然也可以通过property Bindings机制代替。
Task:
Task继承了java.util,concurrency.FutureTask类。
Task是不可以重复执行的Worker
运行方式:1. new Thread(task).start();
2.ExecutorService.submit(task);
3.其他。
Task中有许多updateXXX方法用于改变相应的property。setOnXXX方法用于设置EventHandler.
继承Task需要覆盖其call()方法,call()方法会在后台线程中被调用。
由于java中并没有安全的线程中断机制,所以你必须在call()方法内部对isCancelled()进行判断
Task<Integer> task = new Task<Integer>() {
@Override protected Integer call() throws Exception {
int iterations;
for (iterations = 0; iterations < 100000; iterations++) {
if (isCancelled()) {
break;
}
System.out.println("Iteration " + iterations);
}
return iterations;
}如果你想在线程中运行Thread.sleep(),那么你就需要捕获InterruptException并进行isCancelled的两次检查。
Task<Integer> task = new Task<Integer>() {
@Override protected Integer call() throws Exception {
int iterations;
for (iterations = 0; iterations < 1000; iterations++) {
if (isCancelled()) {
updateMessage("Cancelled");
break;
}
updateMessage("Iteration " + iterations);
updateProgress(iterations, 1000);
//Block the thread for a short time, but be sure
//to check the InterruptedException for cancellation
try {
Thread.sleep(100);
} catch (InterruptedException interrupted) {
if (isCancelled()) {
updateMessage("Cancelled");
break;
}典型的例子就是更新进度的问题:这里通过property bindings机制
Task task = new Task<Void>() {
@Override public Void call() {
static final int max = 1000000;
for (int i=1; i<=max; i++) {
if (isCancelled()) {
break;
}
updateProgress(i, max);
}
return null;
}
};
ProgressBar bar = new ProgressBar();
bar.progressProperty().bind(task.progressProperty());
new Thread(task).start();注意:updateProgress(workDone,totalWork);它会同时更新progress,totalWork,workDone三个property.
Service:
Service会封装Task,并负责后台执行它封装的Task。
Service是可重复执行的。
Service可以被注入Executor依赖,并通过注入的Executor执行实际的Task。否则Service只会构造一个简单的Thread执行。
Service通过Service的start()方法执行任务。
可以通过WorkerStateEvent监听变化。
private static class FirstLineService extends Service<String> {
private StringProperty url = new SimpleStringProperty();
public final void setUrl(String value) {
url.set(value);
}
public final String getUrl() {
return url.get();
}
public final StringProperty urlProperty() {
return url;
}
@Override
protected Task<String> createTask() {
return new Task<String>() {
@Override
protected String call()
throws IOException, MalformedURLException {
try ( BufferedReader in = new BufferedReader(
new InputStreamReader(
new URL(getUrl()).openStream;
in = new BufferedReader(
new InputStreamReader(u.openStream()))) {
return in.readLine();
} FirstLineService service = new FirstLineService();
service.setUrl("http://google.com");
service.setOnSucceeded(new EventHandler<WorkerStateEvent>() {
@Override
public void handle(WorkerStateEvent t) {
System.out.println("done:" + t.getSource().getValue());
}
});
service.start();ScheduledService:
ScheduledService是一个可以自动重新开始的任务执行器。ScheduledService在fails失败时是否会自动重新开始以及自动执行的次数取决于:restartOnFailure,backoffStrategy,cumulativePeriod,maximumFailureCount几个property,
ScheduledService<Document> svc = new ScheduledService<>(Duration.seconds(1)) {
protected Task<Document> createTask() {
return new Task<Document>() {
protected Document call() {
// Connect to a Server
// Get the XML document
// Parse it into a document
return document;
}
}
}
}对于确实需要在另外一个线程与Application Thread进行交互的可以使用Platform.runLater(new Runnable(){});