ping.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #!/usr/bin/env python
  2. """
  3. A pure python ping implementation using raw socket.
  4. Note that ICMP messages can only be sent from processes running as root.
  5. Derived from ping.c distributed in Linux's netkit. That code is
  6. copyright (c) 1989 by The Regents of the University of California.
  7. That code is in turn derived from code written by Mike Muuss of the
  8. US Army Ballistic Research Laboratory in December, 1983 and
  9. placed in the public domain. They have my thanks.
  10. Bugs are naturally mine. I'd be glad to hear about them. There are
  11. certainly word - size dependenceies here.
  12. Copyright (c) Matthew Dixon Cowles, <http://www.visi.com/~mdc/>.
  13. Distributable under the terms of the GNU General Public License
  14. version 2. Provided with no warranties of any sort.
  15. Original Version from Matthew Dixon Cowles:
  16. -> ftp://ftp.visi.com/users/mdc/ping.py
  17. Rewrite by Jens Diemer:
  18. -> http://www.python-forum.de/post-69122.html#69122
  19. Revision history
  20. ~~~~~~~~~~~~~~~~
  21. March 17, 2016
  22. Changes by Corentin Debains:
  23. - converted to support python 3 and added packaging info
  24. Januari 27, 2015
  25. Changed receive response to not accept ICMP request messages.
  26. It was possible to receive the very request that was sent.
  27. March 11, 2010
  28. changes by Samuel Stauffer:
  29. - replaced time.clock with default_timer which is set to
  30. time.clock on windows and time.time on other systems.
  31. May 30, 2007
  32. little rewrite by Jens Diemer:
  33. - change socket asterisk import to a normal import
  34. - replace time.time() with time.clock()
  35. - delete "return None" (or change to "return" only)
  36. - in checksum() rename "str" to "source_string"
  37. November 22, 1997
  38. Initial hack. Doesn't do much, but rather than try to guess
  39. what features I (or others) will want in the future, I've only
  40. put in what I need now.
  41. December 16, 1997
  42. For some reason, the checksum bytes are in the wrong order when
  43. this is run under Solaris 2.X for SPARC but it works right under
  44. Linux x86. Since I don't know just what's wrong, I'll swap the
  45. bytes always and then do an htons().
  46. December 4, 2000
  47. Changed the struct.pack() calls to pack the checksum and ID as
  48. unsigned. My thanks to Jerome Poincheval for the fix.
  49. Last commit info:
  50. ~~~~~~~~~~~~~~~~~
  51. $LastChangedDate: $
  52. $Rev: $
  53. $Author: $
  54. """
  55. import os, sys, socket, struct, select, time
  56. if sys.platform == "win32":
  57. # On Windows, the best timer is time.clock()
  58. default_timer = time.clock
  59. else:
  60. # On most other platforms the best timer is time.time()
  61. default_timer = time.time
  62. # From /usr/include/linux/icmp.h; your milage may vary.
  63. ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.
  64. def checksum(source_string):
  65. """
  66. I'm not too confident that this is right but testing seems
  67. to suggest that it gives the same answers as in_cksum in ping.c
  68. """
  69. sum = 0
  70. countTo = (len(source_string)/2)*2
  71. count = 0
  72. while count<countTo:
  73. thisVal = source_string[count + 1]*256 + source_string[count]
  74. sum = sum + thisVal
  75. sum = sum & 0xffffffff # Necessary?
  76. count = count + 2
  77. if countTo<len(source_string):
  78. sum = sum + source_string[len(source_string) - 1]
  79. sum = sum & 0xffffffff # Necessary?
  80. sum = (sum >> 16) + (sum & 0xffff)
  81. sum = sum + (sum >> 16)
  82. answer = ~sum
  83. answer = answer & 0xffff
  84. # Swap bytes. Bugger me if I know why.
  85. answer = answer >> 8 | (answer << 8 & 0xff00)
  86. return answer
  87. def receive_one_ping(my_socket, ID, timeout):
  88. """
  89. receive the ping from the socket.
  90. """
  91. timeLeft = timeout
  92. while True:
  93. startedSelect = default_timer()
  94. whatReady = select.select([my_socket], [], [], timeLeft)
  95. howLongInSelect = (default_timer() - startedSelect)
  96. if whatReady[0] == []: # Timeout
  97. return
  98. timeReceived = default_timer()
  99. recPacket, addr = my_socket.recvfrom(1024)
  100. icmpHeader = recPacket[20:28]
  101. type, code, checksum, packetID, sequence = struct.unpack(
  102. "bbHHh", icmpHeader
  103. )
  104. # Filters out the echo request itself.
  105. # This can be tested by pinging 127.0.0.1
  106. # You'll see your own request
  107. if type != 8 and packetID == ID:
  108. bytesInDouble = struct.calcsize("d")
  109. timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0]
  110. return timeReceived - timeSent
  111. timeLeft = timeLeft - howLongInSelect
  112. if timeLeft <= 0:
  113. return
  114. def send_one_ping(my_socket, dest_addr, ID):
  115. """
  116. Send one ping to the given >dest_addr<.
  117. """
  118. dest_addr = socket.gethostbyname(dest_addr)
  119. # Header is type (8), code (8), checksum (16), id (16), sequence (16)
  120. my_checksum = 0
  121. # Make a dummy heder with a 0 checksum.
  122. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
  123. bytesInDouble = struct.calcsize("d")
  124. data = bytes((192 - bytesInDouble) * "Q", 'utf-8')
  125. data = struct.pack("d", default_timer()) + data
  126. # Calculate the checksum on the data and the dummy header.
  127. my_checksum = checksum(header + data)
  128. # Now that we have the right checksum, we put that in. It's just easier
  129. # to make up a new header than to stuff it into the dummy.
  130. header = struct.pack(
  131. "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
  132. )
  133. packet = header + data
  134. my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1
  135. def do_one(dest_addr, timeout):
  136. """
  137. Returns either the delay (in seconds) or none on timeout.
  138. """
  139. icmp = socket.getprotobyname("icmp")
  140. try:
  141. my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
  142. except PermissionError as e:
  143. e.args = (e.args if e.args else tuple()) + ((
  144. " - Note that ICMP messages can only be sent from processes"
  145. " running as root."
  146. ),)
  147. raise
  148. my_ID = os.getpid() & 0xFFFF
  149. send_one_ping(my_socket, dest_addr, my_ID)
  150. delay = receive_one_ping(my_socket, my_ID, timeout)
  151. my_socket.close()
  152. return delay
  153. def verbose_ping(dest_addr, timeout = 2, count = 4):
  154. """
  155. Send >count< ping to >dest_addr< with the given >timeout< and display
  156. the result.
  157. """
  158. for i in range(count):
  159. print("ping %s..." % dest_addr, end=' ')
  160. try:
  161. delay = do_one(dest_addr, timeout)
  162. except socket.gaierror as e:
  163. print("failed. (socket error: '%s')" % e)
  164. break
  165. if delay is None:
  166. print("failed. (timeout within %ssec.)" % timeout)
  167. else:
  168. delay = delay * 1000
  169. print("get ping in %0.4fms" % delay)
  170. print()
  171. if __name__ == '__main__':
  172. verbose_ping("heise.de")
  173. verbose_ping("google.com")
  174. verbose_ping("a-test-url-taht-is-not-available.com")
  175. verbose_ping("192.168.1.1")