-
Notifications
You must be signed in to change notification settings - Fork 336
Adding stream listeners to sniff change in child #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hi @rizasif, This is awesome! Thanks for doing it. I will look it over in the next couple of days. Also since this is essentially a new API, we need to get it through some of our internal review processes before we can merge it. It's going to take some time, but please hang tight. In the meantime, if you can address the lint errors reported at https://travis-ci.org/firebase/firebase-admin-python/jobs/254848028 that would be great. |
Also please do provide some unit tests to start with. This will help us review this change, and also make changes to it in safe manner. |
Sure, I’ll get back to you on this 😊
Sent from Mail for Windows 10
From: Hiranya Jayathilaka
Sent: Tuesday, July 18, 2017 10:52 PM
To: firebase/firebase-admin-python
Cc: Rizwan Asif; Mention
Subject: Re: [firebase/firebase-admin-python] Adding stream listerners tosniff change in child (#50)
Also please do provide some unit tests to start with. This will help us review this change, and also make changes to it in safe manner.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
|
@hiranya911 I not getting how this error is appearing, can you guide me how can I reproduce it on my system? |
@rizasif This is a lint error. Try running |
Is there a way to get the ETag when streaming the changes @hiranya911? If so, this seems quite relevant to the pull request that I made. |
Hi @alexanderwhatley. I believe it's not possible to retrieve ETags when using the streaming API (@caseycrogers or @rockwotj please confirm). Usually we treat transactions and event listeners as two separate features -- although one may use them together to implement certain complex use cases. Did you have any particular use case in mind? |
You are correct, the streaming API a single direction - the server sending events to the client. ETags are not sent from this endpoint, you'll have to use the normal REST requests for this. (The clients get around this by computing the hashes locally on the devices). |
@hiranya911, yeah, I was thinking of a situation where the user is storing data in a cache, and would stream data to update the cache, and would use the updated ETag to check whether or not the data in the cache was stale. On a more general note, it would seem like a useful addition to the REST API would be the inclusion of methods that allow the user to retrieve the ETag itself without the corresponding data at some location. @rockwotj, you mention that the ETags are computed as hashes locally if I am understanding you correctly, is the hash function exposed anywhere? |
The open source iOS client is the only place that has it, but I think it was tweaked slightly for REST. The hash function is pretty complicated however, I don't recommend trying to port it to other platforms. If you just want to ETag and not the data, you can always do ?print=silent |
@hiranya911 @rizasif May I know what's left to do for this PR to get merged and released? I want to see how I can help here. One quick question about streaming from Firebase RESTful API. Just trying my luck to see if you guys know anything about it. With my modified version of PyRebase, the streaming works mostly fine except for getting "502 Server Error: Bad Gateway for url..." from Firebase occasionally. On average, it occurs 2-3 days after I restart the stream listener process on my server. But sometimes it occurs in the same day. Note that the Firebase query that I set up for streaming uses |
@tamakisquare I believe this PR has all the key elements necessary to support event listeners. The public API and the implementation could use a bit of tinkering. It also need to be rebased to the latest head. The biggest thing that is holding this PR back right now is the lack of any tests and insufficient documentation. If you or somebody can spend some time to provide those we can start the code review process for it. I'm not sure why you're getting 502 errors. Perhaps @rockwotj can shed some light on that matter. |
@tamakisquare the 502 are most likely when we deploy to your database host. We have measure in place to prevent this from happening, but the progress is slow. Clients should try and reconnect with backoff, but expect this to become less of an issue as time goes on. |
Hello people, |
@hiranya911 @rizasif @rockwotj. Thanks for the information and your quick response. Good to learn that the issue is under your team's radar and that it will eventually ease off. Is this issue tracked somewhere that is accessible to others like me? |
Sorry no public status tracker or promises on timeline. |
@tamakisquare I think the issue mentioned by @rizasif is due to extending the |
@hiranya911 Thanks for the tip. I'll keep that in mind when I look into the PR. @rockwotj No problem. All is good. Thanks for the quick responses. Cheers. |
The sseclient used in this PR is a clone of the sseclient used in PyRebase. The trouble with that is it does not come with any tests. To save the trouble from writing tests for some module that I am not familiar with, I have decided to work with another SSE client that comes with its own set of tests. The work is in progress. I will submit another PR once the work is done. |
@tamakisquare +1 for using a third-party library for SSE support. It looks like the one you've chosen works readily with the requests library, which we use in the Admin SDK. |
@tamakisquare what is the progress? If you need anything done then I am available. |
@rizasif - The development work is almost done. I got carried away by other things, so I haven't worked on it in the past couple weeks. I'll get back to it soon. I could use help with writing tests. Would that be something you could help me with? |
@tamakisquare I recommend you share the details of your implementation (at least the public API), as early as possible. All public APIs require explicit approval from my team, and it would be good to get that process started before you put in a ton of effort into writing tests. |
Also once we have some designs and code, I can also probably help out with writing tests. |
Still interested in this! |
We don't have cycles to put into this right now. But if somebody wants to pick it up and carry on I can help get it reviewed and approved. |
waiting for this to merge soon as it's very needy for ARM-Linux based IoT apllications, it's working nice in Pyrebase with some chnages. |
@sphonala you can use my repository in the meanwhile it's working fine for us https://github.com/rizasif/firebase-admin-python |
This PR needs a bit of cleaning up, and at least some unit tests. I don't have the time to work on it myself right now, but if @rizasif or somebody else can help take this PR to maturity, I'll gladly help to get it through the code and API review process. |
Hello everyone, I have been working on resolving the access token issue being faced. And it has been resolved; the token is being refreshed now. I have updated the code in the recent version of firebase_admin_python. Here is the link: https://github.com/Aqsa-K/firebase-admin-python The updates in this version:
I am currently trying to run the tests and I am facing some issues. @hiranya911 I would appreciate some help in this regard. Can you please guide me on how to run the current tests? This will help me write unittests for the 'streaming' changes. |
@Aqsa-K thanks for the update. Instructions for running tests are mostly available in the |
self.stream_handler(msg_data) | ||
|
||
def close(self): | ||
while not self.sse and not hasattr(self.sse, 'resp'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be or
instead of and
?
return self | ||
|
||
def start_stream(self): | ||
self.sse = ClosableSSEClient(self.url, session=self.make_session(), build_headers=self.build_headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This causes an error if pyrebase is not installed.
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/local/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.5/site-packages/firebase_admin/db.py", line 129, in start_stream
self.sse = ClosableSSEClient(self.url, session=self.make_session(), build_headers=self.build_headers)
File "/usr/local/lib/python3.5/site-packages/firebase_admin/db.py", line 88, in __init__
super(ClosableSSEClient, self).__init__(*args, **kwargs)
File "/usr/local/lib/python3.5/site-packages/sseclient.py", line 39, in __init__
self._connect()
File "/usr/local/lib/python3.5/site-packages/firebase_admin/db.py", line 94, in _connect
super(ClosableSSEClient, self)._connect()
File "/usr/local/lib/python3.5/site-packages/sseclient.py", line 47, in _connect
self.resp = requester.get(self.url, stream=True, **self.requests_kwargs)
File "/usr/local/lib/python3.5/site-packages/requests/sessions.py", line 521, in get
return self.request('GET', url, **kwargs)
TypeError: request() got an unexpected keyword argument 'build_headers'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me check. I'll get back to you on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi - Any luck with this one, I am getting exactly the same error. What is the fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have resolved this issue. I'll share the changes by tonight or tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome. Thanks. Waiting for the patch. By the way. This is an awesome library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vijjuk
Yes firebase-admin is much better. I shifted from pyrebase to firebase-admin.
firebase-admin is using sse-client library to listen for changes in firebase database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, can I ask what is the status with this pull request? Is it going to be merged? I am developing an app, that requires the server to listen to changes in the firebase database. @Aqsa-K I tried your fork repo, somehow it fires off an event as soon as I set the stream()
and the callback function is called immediately. But the intended listening event is triggered every time when the data is changed in the database side.
>>> ref.stream(callbackfunc)
<firebase_admin.db.Stream object at 0x10c9b7ef0>
>>> {'event': 'put', 'data': True, 'path': '/'}
>>> # The event above is fired off automatically on stream without modification on db
>>> # these 3 events below are fired off by the changes in the db.
>>> {'event': 'put', 'data': 'true', 'path': '/'}
>>> {'event': 'put', 'data': 'false', 'path': '/'}
>>> {'event': 'put', 'data': 'true', 'path': '/'}
>>> self.db.reference("/users/dev1/successful").stream(callbackfunc)
<firebase_admin.db.Stream object at 0x10ef897f0>
>>> {'event': 'put', 'data': True, 'path': '/'}
>>> # The event above is fired off automatically on stream without modification on db
Is it possible to fix this unexpected behavior? I would like to help fix it, but I have no idea where to start looking and I have never worked with SSE before. If anyone can point me in correct direction, please do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first event is fired initially to retrieve all of the current data in the database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@the-c0d3r you can separate out the first event which is fired initially by filtering the type 'event' or the type 'path' in the message received by callback function.
If you intend to look for changes in your data, then that should have the type 'event' as 'put'.
Let me know if this is still unclear and you need more help with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @Aqsa-K Thanks for your comment. I have implemented functionality to filter the events and to only fire them if the value changes. I also created a PR on your fork. I noticed a few things on the SSE client. When I close the stream, it takes about 10 to 20 seconds for it to return. The following close function triggers some attribute error as well. So I catch them with an exception.
def close(self):
self.should_connect = False
self.retry = 0
try:
self.resp.close()
self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR)
self.resp.raw._fp.fp.raw._sock.close()
except AttributeError:
pass
def close(self):
while not self.sse and not hasattr(self.sse, 'resp'):
time.sleep(0.001)
self.sse.running = False
self.sse.close()
self.thread.join()
# return self
But the problem still remains, the closing of the stream takes too long for a single threaded operation for my app. The close()
function of the stream waits for the sse client thread to join. I think that takes up the most time. Is there any way to force the closing of the stream? I need to watch the changes based on the users' query (which differs everytime), then after getting results, I have to dispose of the stream. One workaround I can think of is to do that in a threadpool, so the thread can wait on the sse client closing process, but that does not seem to be the optimal way to me. Or am I doing anything wrong by trying to close?
The following is the way I dispose of the streams
streamRef = db.reference("/path/to/obj").stream(lambda x: print(x))
# attempt some changes on the path
streamRef.close()
# takes from 10 to 20 seconds here to return
Thanks for reading.
Thank you. Do I need to git pull the changes. Please let me know from which branch . Or can I do a pip update ?
…Sent from my iPhone
On 13-Apr-2018, at 3:21 PM, Aqsa Kausar ***@***.***> wrote:
@Aqsa-K commented on this pull request.
In firebase_admin/db.py:
> + self.start()
+
+ def make_session(self):
+ """
+ Return a custom session object to be passed to the ClosableSSEClient.
+ """
+ session = KeepAuthSession()
+ return session
+
+ def start(self):
+ self.thread = threading.Thread(target=self.start_stream)
+ self.thread.start()
+ return self
+
+ def start_stream(self):
+ self.sse = ClosableSSEClient(self.url, session=self.make_session(), build_headers=self.build_headers)
Hey @vijjuk
Sorry for the delay
Can you kindly give this one a try, it's working at my end.
link: https://github.com/Aqsa-K/firebase-admin-python
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
|
Oh!!! I just realized, I am using pyrebase and I was getting this error there..not in firebase-admin…do u recommend firebase-admin?? how do u do SSE? can u give me an example code. not able to find it
… On Apr 14, 2018, at 11:31 AM, Aqsa Kausar ***@***.***> wrote:
@Aqsa-K commented on this pull request.
In firebase_admin/db.py <#50 (comment)>:
> + self.start()
+
+ def make_session(self):
+ """
+ Return a custom session object to be passed to the ClosableSSEClient.
+ """
+ session = KeepAuthSession()
+ return session
+
+ def start(self):
+ self.thread = threading.Thread(target=self.start_stream)
+ self.thread.start()
+ return self
+
+ def start_stream(self):
+ self.sse = ClosableSSEClient(self.url, session=self.make_session(), build_headers=self.build_headers)
@vijjuk <https://github.com/vijjuk>
You cannot pip update this right now. You'll have to git pull it from here: https://github.com/Aqsa-K/firebase-admin-python <https://github.com/Aqsa-K/firebase-admin-python>
Once you have pulled the changes, remember to run setup.py as follows
python setup.py build
python setup.py install
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub <#50 (comment)>, or mute the thread <https://github.com/notifications/unsubscribe-auth/AGR4A1li1Pk9lDqVoi4GRcILBohx4sbNks5toZC0gaJpZM4ObUDz>.
|
I moved to Firebase-admin. I agree with you. It’s much easier and simpler. Loving it. It’s all working fine now. Thank you.
…Sent from my iPhone
On 14-Apr-2018, at 11:04 PM, Aqsa Kausar ***@***.***> wrote:
@Aqsa-K commented on this pull request.
In firebase_admin/db.py:
> + self.start()
+
+ def make_session(self):
+ """
+ Return a custom session object to be passed to the ClosableSSEClient.
+ """
+ session = KeepAuthSession()
+ return session
+
+ def start(self):
+ self.thread = threading.Thread(target=self.start_stream)
+ self.thread.start()
+ return self
+
+ def start_stream(self):
+ self.sse = ClosableSSEClient(self.url, session=self.make_session(), build_headers=self.build_headers)
@vijjuk
Yes firebase-admin is much better. I shifted from pyrebase to firebase-admin.
firebase-admin is using sse-client library to listen for changes in firebase database.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
|
There's some great work here. What is the progress with the PR though? Are we any closer to merging it, if at all? This is a gentle reminder that this PR is celebrating its 1 year birthday today 🎂. |
Can't accept a PR with no tests and breaks the build due to lint errors. If somebody's willing to put in the effort to get it up to standard, then we can give it a try. |
Hi @hiranya911 I have fixed up most of the lint errors (unused, too long, no docstrings, wrong import sequence, etc) from db.py and sseclient.py . I couldn't manage to fix the following lint errors. I do not know what other alternatives can be done for those protected variables.
How do I go about writing unit testing for this kind of client-server module? What other things do I need to do, in order to complete this PR? And do I create a new PR with the latest master's commits? I'm quite new to contributing to open source projects, please bear with me. |
Hi @the-c0d3r. Thanks for offering to contribute. Here are few pointers:
|
Closing since #183 was merged |
Hello,
I noticed that there (according to my best knowledge) hasn't been a stream listener implemented in the project. Which has a sole responsibility to fire up a callback function (https://firebase.google.com/docs/reference/rest/database/#callback) when any change to a firebase node is occurred. This is essential for developers who want to perform an action when a change to database is observed.
The changes I proposed are inspired by PyRebase (https://github.com/thisbejim/Pyrebase). I have implemented a SSEClient which uses python request library to listen for changes. These changes have been tested using a simulation of 100 users, performing read and write operations. The results, so far, are promising with 0% failure rate.
I have written a test code for you as well. The following adds a new child node to the already available parent "users". Now, if you visit the firebase console and delete this new child node, then the "call_function" will fire up. Similar, case can be observed by updating, adding or deleting in the child of "users" node through the console.
Hope to hear from you soon, thanks!