昨天,隔壁大娘教了我五个字母,今天一兴奋,决定和机翻配合用下。不要问为什么全漏了,我倒是要会呀!!放一个鬼在这。
Func<Action, Action<int[]>> iter = new Func<Action,Action<int[]>>
(x => new Action<int[]>(y => { foreach (int i in y) x(i); }));
var OutputArray = iter(x => Console.WriteLine(x));
int[] data = { 1, 2, 3, 4, 5 };
OutputArray(data);
Why Rx?
通常,unity的网络操作需要使用到WWW和协程(Coroutine),但是,使用协程(Coroutine)对于异步操作而言不是一个好的办法,原因如下:
协程不能返回任何值,因为它的返回类型必须是IEnumerator。
协程不能够捕捉异常,因为yield return 声明不支持try-catch结构。
Introduction
下面的代码,通过UniRx的实现了双击操作:
var clickStream = Observable.EveryUpdate()
.Where(_ => Input.GetMouseButtonDown(0));
clickStream.Buffer(clickStream.Throttle(TimeSpan.FromMilliseconds(250)))
.Where(xs => xs.Count >= 2)
.Subscribe(xs => Debug.Log(“DoubleClick Detected! Count:” + xs.Count));
这个例子演示了以下的特征:
游戏循环作为事件流
事件流可以组合
合并自身事件流
基于时间操作处理很简单
Network operations
对异步网络操作使用ObservableWWW ,它的Get/Post 函数返回可预见可观测:
ObservableWWW.Get(“http://google.co.jp/”)
.Subscribe(
x => Debug.Log(x.Substring(0, 100)), // onSuccess
ex => Debug.LogException(ex)); // onError
Rx可以组合并可以撤销,你也能通过LINQ表达式查询:
// composing asynchronous sequence with LINQ query expressions
var query = from google in ObservableWWW.Get(“http://google.com/”)
from bing in ObservableWWW.Get(“http://bing.com/”)
from unknown in ObservableWWW.Get(google + bing)
select new { google, bing, unknown };
var cancel = query.Subscribe(x => Debug.Log(x));
// Call Dispose is cancel.
cancel.Dispose();
对所有的并行请求使用Observable.WhenAll :
// Observable.WhenAll is for parallel asynchronous operation
// (It’s like Observable.Zip but specialized for single async operations like Task.WhenAll)
var parallel = Observable.WhenAll(
ObservableWWW.Get(“http://google.com/”),
ObservableWWW.Get(“http://bing.com/”),
ObservableWWW.Get(“http://unity3d.com/”));
parallel.Subscribe(xs =>
{
Debug.Log(xs[0].Substring(0, 100)); // google
Debug.Log(xs[1].Substring(0, 100)); // bing
Debug.Log(xs[2].Substring(0, 100)); // unity
});
获得进度信息:
// notifier for progress use ScheudledNotifier or new Progress(/* action */)
var progressNotifier = new ScheduledNotifier();
progressNotifier.Subscribe(x => Debug.Log(x)); // write www.progress
// pass notifier to WWW.Get/Post
ObservableWWW.Get(“http://google.com/”, progress: progressNotifier).Subscribe();
错误处理:
// If WWW has .error, ObservableWWW throws WWWErrorException to onError pipeline.
// WWWErrorException has RawErrorMessage, HasResponse, StatusCode, ResponseHeaders
ObservableWWW.Get(“http://www.google.com/404”)
.CatchIgnore((WWWErrorException ex) =>
{
Debug.Log(ex.RawErrorMessage);
if (ex.HasResponse)
{
Debug.Log(ex.StatusCode);
}
foreach (var item in ex.ResponseHeaders)
{
Debug.Log(item.Key + “:” + item.Value);
}
})
.Subscribe();
Using with IEnumerators (Coroutines)
IEnumerator (协程)是unity原始的异步工具。UniRx 集成了协程并可观测,你能在协程里写异步代码,并使用UniRx编排,这是控制异步流的最佳办法。
// two coroutines
IEnumerator AsyncA()
{
Debug.Log(“a start”);
yield return new WaitForSeconds(1);
Debug.Log(“a end”);
}
IEnumerator AsyncB()
{
Debug.Log(“b start”);
yield return new WaitForEndOfFrame();
Debug.Log(“b end”);
}
// main code
// Observable.FromCoroutine converts IEnumerator to Observable.
// You can also use the shorthand, AsyncA().ToObservable()
// after AsyncA completes, run AsyncB as a continuous routine.
// UniRx expands SelectMany(IEnumerator) as SelectMany(IEnumerator.ToObservable())
var cancel = Observable.FromCoroutine(AsyncA)
.SelectMany(AsyncB)
.Subscribe();
// you can stop a coroutine by calling your subscription’s Dispose.
cancel.Dispose();
假如在Unity5.3,你能使用ToYieldInstruction 对协程观测。
IEnumerator TestNewCustomYieldInstruction()
{
// wait Rx Observable.
yield return Observable.Timer(TimeSpan.FromSeconds(1)).ToYieldInstruction();
// you can change the scheduler(this is ignore Time.scale)
yield return Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.MainThreadIgnoreTimeScale).ToYieldInstruction();
// get return value from ObservableYieldInstruction
var o = ObservableWWW.Get("http://unity3d.com/").ToYieldInstruction(throwOnError: false);
yield return o;
if (o.HasError) { Debug.Log(o.Error.ToString()); }
if (o.HasResult) { Debug.Log(o.Result); }
// other sample(wait until transform.position.y >= 100)
yield return this.transform.ObserveEveryValueChanged(x => x.position).FirstOrDefault(p => p.y >= 100).ToYieldInstruction();
}
通常,我们需要协程返回一个值就必须使用回调,Observable.FromCoroutine 能转换协程通过可撤销的IObservable[T] 替代
// public method
public static IObservable GetWWW(string url)
{
// convert coroutine to IObservable
return Observable.FromCoroutine((observer, cancellationToken) => GetWWWCore(url, observer, cancellationToken));
}
// IObserver is a callback publisher
// Note: IObserver’s basic scheme is “OnNext* (OnError | Oncompleted)?”
static IEnumerator GetWWWCore(string url, IObserver observer, CancellationToken cancellationToken)
{
var www = new UnityEngine.WWW(url);
while (!www.isDone && !cancellationToken.IsCancellationRequested)
{
yield return null;
}
if (cancellationToken.IsCancellationRequested) yield break;
if (www.error != null)
{
observer.OnError(new Exception(www.error));
}
else
{
observer.OnNext(www.text);
observer.OnCompleted(); // IObserver needs OnCompleted after OnNext!
}
}
这里有一些更多的例子,接下来是一个多OnNext模式。
public static IObservable ToObservable(this UnityEngine.AsyncOperation asyncOperation)
{
if (asyncOperation == null) throw new ArgumentNullException(“asyncOperation”);
return Observable.FromCoroutine<float>((observer, cancellationToken) => RunAsyncOperation(asyncOperation, observer, cancellationToken));
}
static IEnumerator RunAsyncOperation(UnityEngine.AsyncOperation asyncOperation, IObserver observer, CancellationToken cancellationToken)
{
while (!asyncOperation.isDone && !cancellationToken.IsCancellationRequested)
{
observer.OnNext(asyncOperation.progress);
yield return null;
}
if (!cancellationToken.IsCancellationRequested)
{
observer.OnNext(asyncOperation.progress); // push 100%
observer.OnCompleted();
}
}
// usecase
Application.LoadLevelAsync(“testscene”)
.ToObservable()
.Do(x => Debug.Log(x)) // output progress
.Last() // last sequence is load completed
.Subscribe();
Using for MultiThreading
// Observable.Start is start factory methods on specified scheduler
// default is on ThreadPool
var heavyMethod = Observable.Start(() =>
{
// heavy method…
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
return 10;
});
var heavyMethod2 = Observable.Start(() =>
{
// heavy method…
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(3));
return 10;
});
// Join and await two other thread values
Observable.WhenAll(heavyMethod, heavyMethod2)
.ObserveOnMainThread() // return to main thread
.Subscribe(xs =>
{
// Unity can’t touch GameObject from other thread
// but use ObserveOnMainThread, you can touch GameObject naturally.
(GameObject.Find(“myGuiText”)).guiText.text = xs[0] + “:” + xs[1];
});
DefaultScheduler
UniRx默认的基于时间操作使用Scheduler.MainThread作为他们的调度程序,这意味着大多数操作(除了Observable.Start)在单线程工作,所以ObserverOn 不需要并且线程安全措施可以忽略。这对于标准的RxNet实施并不相同,但是更合适Unity环境。
Scheduler.MainThread运行受时间缩放的影响,如果你想忽略时间缩放,使用Scheduler.MainThreadIgnoreTimeScale代替。
MonoBehaviour triggers
UniRx通过UniRx.Triggers能处理MonoBehavirour事件:
using UniRx;
using UniRx.Triggers; // need UniRx.Triggers namespace
public class MyComponent : MonoBehaviour
{
void Start()
{
// Get the plain object
var cube = GameObject.CreatePrimitive(PrimitiveType.Cube);
// Add ObservableXxxTrigger for handle MonoBehaviour's event as Observable
cube.AddComponent<ObservableUpdateTrigger>()
.UpdateAsObservable()
.SampleFrame(30)
.Subscribe(x => Debug.Log("cube"), () => Debug.Log("destroy"));
// destroy after 3 second:)
GameObject.Destroy(cube, 3f);
}
}
支持的触发器有:ObservableAnimatorTrigger, ObservableCollision2DTrigger, ObservableCollisionTrigger, ObservableDestroyTrigger, ObservableEnableTrigger, ObservableFixedUpdateTrigger, ObservableUpdateTrigger, ObservableLastUpdateTrigger, ObservableMouseTrigger, ObservableTrigger2DTrigger, ObservableTriggerTrigger, ObservableVisibleTrigger, ObservableTransformChangedTrigger, ObservableRectTransformTrigger, ObservableCanvasGroupChangedTrigger, ObservableStateMachineTrigger, ObservableEventTrigger.
这些通过在组件/游戏物体上扩展方法也可以跟容易的处理。这些方法自动注入ObservableTrigger 除了ObservableEventTrigger 和ObservableStateMachineTrigger
using UniRx;
using UniRx.Triggers; // need UniRx.Triggers namespace for extend gameObejct
public class DragAndDropOnce : MonoBehaviour
{
void Start()
{
// All events can subscribe by ***AsObservable
this.OnMouseDownAsObservable()
.SelectMany(_ => this.UpdateAsObservable())
.TakeUntil(this.OnMouseUpAsObservable())
.Select(_ => Input.mousePosition)
.Subscribe(x => Debug.Log(x));
}
}
Creating custom triggers
处理Unity事件转到Observable是最好的方法,如果UniRx提供的标准触发器还不够,你能创建自定义触发器,为了证明,这里有一个uGUI长按触发器:
public class ObservableLongPointerDownTrigger : ObservableTriggerBase, IPointerDownHandler, IPointerUpHandler
{
public float IntervalSecond = 1f;
Subject<Unit> onLongPointerDown;
float? raiseTime;
void Update()
{
if (raiseTime != null && raiseTime <= Time.realtimeSinceStartup)
{
if (onLongPointerDown != null) onLongPointerDown.OnNext(Unit.Default);
raiseTime = null;
}
}
void IPointerDownHandler.OnPointerDown(PointerEventData eventData)
{
raiseTime = Time.realtimeSinceStartup + IntervalSecond;
}
void IPointerUpHandler.OnPointerUp(PointerEventData eventData)
{
raiseTime = null;
}
public IObservable<Unit> OnLongPointerDownAsObservable()
{
return onLongPointerDown ?? (onLongPointerDown = new Subject<Unit>());
}
protected override void RaiseOnCompletedOnDestroy()
{
if (onLongPointerDown != null)
{
onLongPointerDown.OnCompleted();
}
}
}
它能像标准触发器一样容易使用
var trigger = button.AddComponent();
trigger.OnLongPointerDownAsObservable().Subscribe();
Observable Lifecycle Management
什么时候呼叫OnCompleted ?当使用UniRx时考虑订阅的生命周期管理是非常重要的。ObservableTriggers呼叫OnCompleted时,附加的游戏对象被破坏,其它静态产生方法(Observable.Timer, Observable.EveryUpdate, 等等)不会自动停止,并且它们的订阅应该手动管理。
Rx提供一些辅助方法,比如IDisposable.AddTo允许你一次销毁若干订阅。
// CompositeDisposable is similar with List, manage multiple IDisposable
CompositeDisposable disposables = new CompositeDisposable(); // field
void Start()
{
Observable.EveryUpdate().Subscribe(x => Debug.Log(x)).AddTo(disposables);
}
void OnTriggerEnter(Collider other)
{
// .Clear() => Dispose is called for all inner disposables, and the list is cleared.
// .Dispose() => Dispose is called for all inner disposables, and Dispose is called immediately after additional Adds.
disposables.Clear();
}
如果你想在游戏对象销毁时自动销毁,使用AddTo(游戏对象/组件)
void Start()
{
Observable.IntervalFrame(30).Subscribe(x => Debug.Log(x)).AddTo(this);
}
AddTo促进自动销毁,如果你需要在管线里特殊处理OnCompleted,使用TakeWhile, TakeUntil, TakeUntilDestroy 和TakeUntilDisable 替代:
Observable.IntervalFrame(30).TakeUntilDisable(this)
.Subscribe(x => Debug.Log(x), () => Debug.Log(“completed!”));
如果你处理事件,Repeat是一个重要但是危险的方法,可能会导致一个无限循环,所以小心处理:
using UniRx;
using UniRx.Triggers;
public class DangerousDragAndDrop : MonoBehaviour
{
void Start()
{
this.gameObject.OnMouseDownAsObservable()
.SelectMany(_ => this.gameObject.UpdateAsObservable())
.TakeUntil(this.gameObject.OnMouseUpAsObservable())
.Select(_ => Input.mousePosition)
.Repeat() // dangerous!!! Repeat cause infinite repeat subscribe at GameObject was destroyed.(If in UnityEditor, Editor is freezed)
.Subscribe(x => Debug.Log(x));
}
}
UniRx提供额外的安全重复方法,RepeatSafe:如果连续的“OnComplete”触发重复停止。RepeatUntilDestroy(gameObject/component), RepeatUntilDisable(gameObject/component)当目标游戏对象被销毁时允许停止。
this.gameObject.OnMouseDownAsObservable()
.SelectMany(_ => this.gameObject.UpdateAsObservable())
.TakeUntil(this.gameObject.OnMouseUpAsObservable())
.Select(_ => Input.mousePosition)
.RepeatUntilDestroy(this) // safety way
.Subscribe(x => Debug.Log(x));
如果订阅在订阅中,不分离事件的话。UniRx保证观察热点有持久的异常,这是怎么一回事?
button.OnClickAsObservable().Subscribe(_ =>
{
// If throws error in inner subscribe, but doesn’t detached OnClick event.
ObservableWWW.Get(“htttp://error/”).Subscribe(x =>
{
Debug.Log(x);
});
});
这种行为有时是有用的,如用户事件处理。
所有类的实例提供了一个observeeveryvaluechanged方法,观察每一帧值的改变。
// watch position change
this.transform.ObserveEveryValueChanged(x => x.position).Subscribe(x => Debug.Log(x));
这是非常有用的。如果观察的目标是一个游戏对象,当目标被摧毁它将停止观察,呼叫onCompleted。如果观察的目标是一个普通的C #对象,OnCompleted被垃圾回收。
Converting Unity callbacks to IObservables
对于异步操作使用主题(或异步主题)
public class LogCallback
{
public string Condition;
public string StackTrace;
public UnityEngine.LogType LogType;
}
public static class LogHelper
{
static Subject subject;
public static IObservable<LogCallback> LogCallbackAsObservable()
{
if (subject == null)
{
subject = new Subject<LogCallback>();
// Publish to Subject in callback
UnityEngine.Application.RegisterLogCallback((condition, stackTrace, type) =>
{
subject.OnNext(new LogCallback { Condition = condition, StackTrace = stackTrace, LogType = type });
});
}
return subject.AsObservable();
}
}
// method is separatable and composable
LogHelper.LogCallbackAsObservable()
.Where(x => x.LogType == LogType.Warning)
.Subscribe();
LogHelper.LogCallbackAsObservable()
.Where(x => x.LogType == LogType.Error)
.Subscribe();
在Unity5,Application.RegisterLogCallback 移除 in favor of Application.logMessageReceived,所以我们简单的使用 Observable.FromEvent.
public static IObservable LogCallbackAsObservable()
{
return Observable.FromEvent<Application.LogCallback, LogCallback>(
h => (condition, stackTrace, type) => h(new LogCallback { Condition = condition, StackTrace = stackTrace, LogType = type }),
h => Application.logMessageReceived += h, h => Application.logMessageReceived -= h);
}
Stream Logger
// using UniRx.Diagnostics;
// logger is threadsafe, define per class with name.
static readonly Logger logger = new Logger(“Sample11”);
// call once at applicationinit
public static void ApplicationInitialize()
{
// Log as Stream, UniRx.Diagnostics.ObservableLogger.Listener is IObservable
// You can subscribe and output to any place.
ObservableLogger.Listener.LogToUnityDebug();
// for example, filter only Exception and upload to web.
// (make custom sink(IObserver<EventEntry>) is better to use)
ObservableLogger.Listener
.Where(x => x.LogType == LogType.Exception)
.Subscribe(x =>
{
// ObservableWWW.Post("", null).Subscribe();
});
}
// Debug is write only DebugBuild.
logger.Debug(“Debug Message”);
// or other logging methods
logger.Log(“Message”);
logger.Exception(new Exception(“test exception”));
Unity-specific Extra Gems
// Unity’s singleton UiThread Queue Scheduler
Scheduler.MainThreadScheduler
ObserveOnMainThread()/SubscribeOnMainThread()
// Global StartCoroutine runner
MainThreadDispatcher.StartCoroutine(enumerator)
// convert Coroutine to IObservable
Observable.FromCoroutine((observer, token) => enumerator(observer, token));
// convert IObservable to Coroutine
yield return Observable.Range(1, 10).ToYieldInstruction(); // after Unity 5.3, before can use StartAsCoroutine()
// Lifetime hooks
Observable.EveryApplicationPause();
Observable.EveryApplicationFocus();
Observable.OnceApplicationQuit();
Framecount-based time operators
UniRx提供一些基于帧数的时间操作:
Method
EveryUpdate
EveryFixedUpdate
EveryEndOfFrame
EveryGameObjectUpdate
EveryLateUpdate
ObserveOnMainThread
NextFrame
IntervalFrame
TimerFrame
DelayFrame
SampleFrame
ThrottleFrame
ThrottleFirstFrame
TimeoutFrame
DelayFrameSubscription
例如:延迟调用一次:
Observable.TimerFrame(100).Subscribe(_ => Debug.Log(“after 100 frame”));
每个方法执行的顺序是
EveryGameObjectUpdate(in MainThreadDispatcher’s Execution Order) ->
EveryUpdate ->
EveryLateUpdate ->
EveryEndOfFrame
如果Caller在MainThreadDispatcher.Update前被呼叫,EveryGameObjectUpdate 调用同一帧(我建议MainThreadDispatcher 第一个被呼叫(ScriptExecutionOrder makes -32000)
EveryLateUpdate, EveryEndOfFrame 调用同一帧。
EveryUpdate, 调用下一帧。.
MicroCoroutine
Microcoroutine内存高效并快速协程的worker,这个实现基于Unity 10000次轮询。避免管理-非管理开销,所以快10倍。基于帧时间操作和ObserveEveryValueChanged时MicroCoroutine 自动使用。
如果你想使用MicroCoroutine 代替标准的unity协程,使用MainThreadDispatcher.StartUpdateMicroCoroutine 或 Observable.FromMicroCoroutine.
int counter;
IEnumerator Worker()
{
while(true)
{
counter++;
yield return null;
}
}
void Start()
{
for(var i = 0; i < 10000; i++)
{
// fast, memory efficient
MainThreadDispatcher.StartUpdateMicroCoroutine(Worker());
// slow...
// StartCoroutine(Worker());
}
}
Image title
MicroCoroutine的限制,仅支持yield return null并且更新时间在开始方法里被确定(StartUpdateMicroCoroutine, StartFixedUpdateMicroCoroutine, StartEndOfFrameMicroCoroutine).如果你结合其他的IObservable,你能像结束一样检测完成属性。
IEnumerator MicroCoroutineWithToYieldInstruction()
{
var www = ObservableWWW.Get(“http://aaa”).ToYieldInstruction();
while (!(www.HasResult || www.IsCanceled || www.HasError))
{
yield return null;
}
if (www.HasResult)
{
UnityEngine.Debug.Log(www.Result);
}
}
uGUI Integration