Category: Xtreams

Xtreams at Work

Lately I’ve been working on an Xtreams based implementation of SSL/TLS protocol and I think that some specific parts of that could make interesting examples. Examples that are non-trivial, real-life solutions rather than some artificial academic exercises. This is my first attempt to extract an interesting example, hopefully without getting bogged down in too much detail. Although I will try to explain the constructs used in the example, it will require some basic familiarity with Xtreams, the documentation at can be used as a reference.

As you may know the SSL/TLS protocol is meant to protect other, higher-level protocols from eavesdropping and tampering. The data payload is simply a stream of bytes, the semantics of it are not relevant to SSL/TLS at all. The payload gets split into chunks called records and each record is then individually encrypted and signed to provide the required protection. There is other traffic beyond just the data on an established connection. There are handshake messages for establishment of session keys, etc. Handshake normally happens at the beginning, but can also happen again later on a long lasting connection, generally to allow refreshing the keying material for improved security. There are also alerts, that allow one side to warn the other about certain conditions (e.g. that the connection is about to be closed). Overall there are four different types of payload that can be carried by a record. The rule is that a single record can carry only one type of payload. However there are no rules about how the payload is partitioned between records. A handshake record can carry several handshake messages inside or a single handshake message can span several handshake records. However a single record cannot carry both handshake messages and data. From the point of view of the data payload the record boundaries are irrelevant and should be completely transparent. Consequently, the most straightforward way to present the data payload is as one continuous stream of bytes. Of course the interleaving chunks of non-data payload need to be filtered out and handled accordingly.

Problem definition

To make the solution a bit less convoluted let’s simplify the problem a bit. Let’s say there aren’t four types of traffic but just two, data and non-data. We also won’t worry about what needs to happen with the non-data, let’s just log it in a separate stream. To have some concrete samples to work with we need to define the record structure. Let’s say a record starts with a boolean indicating if the record carries data or not, then another integer specifying the size of its contents and then the contents itself.

Let’s generate some random samples to work with. Let’s say our records will be anywhere from 0 to 9 bytes long and the contents will be always the sequence from 1 to <size>. If size is 0 the contents will be empty. First let’s make a random generator that will generate an integer between 0 and 9.

random := Random new reading collecting: [ :f | (f * 10) floor ].

The part “Random new reading” yields a stream that exploits an instance of Random to generate random floats in range 0 <= x < 1. The collecting part transforms each float into an integer between 0 and 9. With this we can generate a random sample of 10 records as follows.

sample := Array new writing.
10 timesRepeat: [ | size |
	size := random get.
	sample put: random get even;
		put: size;
		write: (1 to: size) ].
sample close; terminal.

The ‘Array new writing’ bit creates a simple write stream over an Array, as usual, the array will automatically grow as elements are written to it. Closing a collection write stream will trim the underlying collection to the actually written size and the #terminal message returns it (it’s called terminal because it will return whatever is at the bottom of an arbitrarily high stream stack). Here’s one sample generated by the above code.

#(false 4 1 2 3 4 true 4 1 2 3 4 false 3 1 2 3 false 5 1 2 3 4 5 true 2 1 2
 false 8 1 2 3 4 5 6 7 8 true 3 1 2 3 true 3 1 2 3 false 0 false 4 1 2 3 4)

In further text when we refer to ‘sample’ we mean a read stream over such an array, which is created simply by sending #reading to the array.


So we start with a simple stream and we need to parse the records out of it. That is actually quite simple.

fragments := [ | size |
		isData := sample get.
		size := sample get.
		(sample limiting: size)
			closeBlock: [ :stream | stream -= 0 ];
	] reading.

Sending #reading to a block creates a block stream, which produces each element by running the block, the element is the result of the block run. Now let’s take a look at what the block produces. It gets a single element from the sample stream and puts it in variable isData. Assuming the sample stream is aligned with beginning of a record, the first element should be the Boolean indicating the type of the record. Then we get another element from the sample stream, the record size, and use it as a parameter of the #limiting: message. Sending #limiting: to a stream creates a “virtual” substream. The actual contents of the substream come from the underlying stream, the substream just makes sure we don’t read more than the specified limit. The closeBlock is there to make sure that when we close the substream the underlying is positioned at the end of it, i.e. at the beginning of the next record. The argument to the closeBlock: is the substream itself and the expression “-= 0” seeks to the end of itself (read seek 0 bytes from the end of the stream). So the result of the block is a virtual stream of the payload of the current record (called fragment in the SSL/TLS spec). That means that fragments is a stream of record payload streams and global variable isData indicates the type of the current record (i.e. the most recent one we read from fragments).


Now we can treat each fragment as a stream, but we know that the fragment boundaries are meaningless. If we have an algorithm that parses a handshake message from a stream, we can’t give it a fragment, because the message might be continuing in a subsequent fragment. What we need is to be able to treat a sequence of adjacent fragments of the same type as a single continuous stream, let’s call it a “run”. As it happens, Xtreams has a handy construct to combine several streams into one, it’s called “stitching”. It takes a stream of streams and makes it look like a single continuous stream. For example, the following two expressions yield the same result.

(1 to: 10) reading
(Array with: (1 to: 3) reading with: (4 to: 7) reading with: (8 to: 10) reading) reading stitching

Conveniently, fragments is a stream of streams, however we don’t want to stitch it all together. We want to stitch just the adjacent fragments of the same type. So, the result will still be a stream of streams, it’s just a stream of runs, rather than stream of individual fragments. To be able to stitch fragments together we need a stream that will keep handing out fragments while the type is the same and ends when the type changes.

[	fragment := fragments get
	runType ifNil: [ runType := isData ].
	isData = runType
		ifTrue: [ fragment ]
		ifFalse: [ Incomplete zero raise ]
] reading stitching

We get a fragment, if it’s the first one, we remember its type and we keep returning fragments, until the type changes. When that happens we raise the Incomplete exception, which signals the block stream that it ended, which in turn signals the stitching stream that it ended as well. This however won’t work, because it will consume the first fragment of the following run. We need to rewrite it a bit differently so that the non-matching fragment can be carried over to the next run. We’ll rewrite the construct so that the first fragment of a run is obtained outside of the run stream itself and brought in via an external variable, fragment. That way the first fragment of the next run can be fetched by the final iteration of previous run.

fragment := fragments get
runType := isData.
[	isData = runType
		ifTrue: [ fragment closing: [ fragment := fragments get ] ]
		ifFalse: [ Incomplete zero raise ]
] reading stitching

The first fragment is read outside of the block stream, we may as well capture the runType at that point. The block stream then simply compares the current isData value to the captured runType, if it matches it returns the current fragment. The difficulty here is that we need to get next fragment after the previous one was read. The best way to achieve that is to fetch the next one in a close block of the previous one. Again if the type changes we raise Incomplete, to signal the end of the run.

Now that we know how to build a single run stream, we need to wrap that up in a stream of run streams. A simple block stream returning the stitched run streams should suffice.

fragment := fragments get.
runsFinished := false.
runs := [ | runType |
		runsFinished ifTrue: [ Incomplete zero raise ].
		runType := isData.
		[	isData = runType
				ifTrue: [
					fragment closing: [
						[ fragment := fragments get ] ifCurtailed: [ runsFinished := true ] ] ]
				ifFalse: [ Incomplete zero raise ]
		] reading stitching.
	 ] reading.

The tricky part is ending the stream. A block stream will keep running the block (whenever it is asked for an element) until a block run raises an Incomplete. Here we want this to be the moment when we run out of fragments, i.e. when getting the next fragment raises an Incomplete. However that action is buried inside the close block inside the stitched stream of a run. When it happens the stitched stream will re-interpret that as the end of itself, so the outer block stream cannot distinguish an end of a run from the end of fragments (it is the end of the last run as well after all). So somehow we need to capture the fact that a fragment get raised an Incomplete and bring that information up to the block stream. That’s what the runsFinished variable is for. Without that the runs stream will keep giving out empty runs forever once it runs out of fragments.

To summarize this step, the “runs” stream turns the “fragments” stream into a stream of runs where adjacent fragments of the same type are stitched together into a single continuous stream, a run. With our sample input we should get following result.

runs collect: [ :r | r rest ]
#(#(1 2 3 4) #(1 2 3 4) #(1 2 3 1 2 3 4 5) #(1 2) #(1 2 3 4 5 6 7 8) #(1 2 3 1 2 3) #(1 2 3 4))

Data vs Control

Now that we have a stream of alternating data and non-data runs, we need to to stitch the data runs together and log the non-data runs into a separate stream. For that we just need a simple block stream that gets a run, if it’s not data, log it and get next one.

control := ByteArray new writing.
data := [ | run |
	run := runs get.
	isData ifFalse: [
		control write: run.
		run := runs get ].
	] reading stitching.

With our sample we should get following results.

data rest
#(1 2 3 4 1 2 1 2 3 1 2 3)

control close; terminal
#[1 2 3 4 1 2 3 1 2 3 4 5 1 2 3 4 5 6 7 8 1 2 3 4]

Note that the whole processing happens behind the scenes as we’re reading from the data stream, the “data rest” bit simply reads everything from the data stream. It happens lazily as data is being read, nothing is being cached, so the performance characteristics shouldn’t change even if we pump megabytes of data through it. In fact we can easily rewrite the fragment stream so that the sample is generated on the fly.

sample := Array new writing.
fragments := [ | size |
	sample put: (isData := random get even).
	sample put: (size := random get).
	(1 to: size) reading ] reading.

Here the “sample” stream is used just to log what we’ve generated, so that we can verify that the results are correct. We only log the type and size, the contents are implicit. If we use this version of fragments, we can’t call #rest on the data stream because the fragments never finish it will just keep reading forever. Here’s a sample run where we read a 100 data bytes instead.

data read: 100
#(1 2 1 2 3 4 1 2 3 4 5 1 1 2 3 4 5 6 7 8 9 1 2 3 4 1 2 1 2 1 2 3 4 1 2 3 4 5 6 1 2 3 4 5 6 7 8 9
1 2 3 4 5 6 7 8 9 1 2 3 1 2 1 2 3 4 5 6 7 1 2 3 4 5 6 1 2 3 4 5 6 7 1 1 2 3 4 5 6 7 1 2 3 4 5 1 2 1 2 3)

sample close; terminal
#(false 6 true 2 false 3 true 4 false 7 true 5 false 5 true 1 false 1 true 9 false 7 true 0 false 8 true 4 false 2
false 8 false 4 true 2 true 2 true 0 false 5 true 4 true 6 true 0 true 9 true 9 false 1 false 6 true 3 false 4 true 2
false 5 true 7 false 7 false 8 true 6 false 8 false 5 false 6 false 2 false 5 false 6 false 4 false 0 true 7 true 1
false 1 true 7 true 5 true 2 false 0 false 1 true 5)

control close; terminal
#[1 2 3 4 5 6 1 2 3 1 2 3 4 5 6 7 1 2 3 4 5 1 1 2 3 4 5 6 7 1 2 3 4 5 6 7 8 1 2 1 2 3 4 5 6 7 8 1 2 3...etc...]

We can also easily profile arbitrarily large run. Let’s rebuild the fragment stream so that it doesn’t log the samples, otherwise it will keep growing and skew the results unnecessarily. Similarly let’s turn the control log into a bit bucket too.

control := nil writing.
TimeProfiler profile: [ nil writing write: 10**7 from: data ]

Here’s a time and allocation profile summary from reading 10MB of data (and about as much control data, assuming reasonably non-biased random generator).


2083 samples, 17.22 average ms/sample, 4023 scavenges, 0 incGCs, 
5 stack spills, 0 mark stack overflows, 0 weak list overflows, 0 JIT cache spills
34.82s active, 1.0s other processes,
35.87s real time, 0.05s profiling overhead
** Totals **
28.8 Context>>findNextMarkedUpTo:
9.6 Context>>terminateTo:
6.5 BlockClosure>>on:do:
3.8 GenericException class>>handles:
3.1 SequenceableCollection>>replaceElementsFrom:to:withSequenceableCollection:startingAt:
2.8 BlockClosure>>cull:
2.4 ResolvedDeferredBinding>>value
2.3 MarkedMethod>>isMarkedForHandle


1394487 samples, 1045 average bytes/sample, 6760 scavenges, 0 incGCs, 
2 stack spills, 0 mark stack overflows, 0 weak list overflows, 0 JIT cache spills
1458037980 bytes
** Totals **
41.7 GenericException class>>new
19.7 Xtreams.ReadStream class>>on:
17.6 LaggedFibonacciRandom>>nextValue
11.1 [] in UndefinedObject>>unboundMethod
6.2 Interval class>>from:to:by:
3.7 Xtreams.StitchReadStream class>>on:first:

To put the results in some perspective, the 20MB of records of average size 5 (0 to 9) means we’ve processed about four million records, each as a stream of its own with bunch of other virtual streams set-up on top. There were lots of lightweight, short-lived objects created in the process: the streams, an Incomplete exception at the end of each stream, the sample intervals representing the contents of each fragment, etc. Apparently the floats generated by the random generator are a significant portion of the profile as well. The profile says that we went through 1.5 GB of objects which, frankly, is a bit more than I’d expect, but the good news is we didn’t trigger single incremental GC, it was all handled within the scope of new space. Remember also that the total includes sample generation as well, it seems that we can safely attribute at least a quarter of the space cost to that. Either way the runtime image size didn’t spike at all. With a normal SSL/TLS connection where record size goes up to 16K, the same amount of overhead should easily cover 50GB of payload (probably more). So the cost of this rather powerful abstraction, which completely hides the underlying protocol, should be quite reasonable and easily dwarfed by all the other overhead on a typical SSL/TLS connection (encryption, IO, etc).

PS: In case you wonder, I did verify the claim that the amount of control payload roughly corresponds to the amount of data payload. For that I used the handy monitoring stream. I wrapped it around the control stream as follows.

control := nil writing monitoring: [ :total | nonData := total ] every: 1 seconds

The stream runs the monitoring block at the specified intervals providing some optional handy arguments, first of which is the total number of elements that went through the stream. After the profiling run I simply inspected the value of nonData and it was where it should have been, well within 2% of 10M.

– ostPed by Martin Kobetic



We are receiving some very encouraging feedback on Xtreams . There’s a fairly complete port to Squeak/Pharo , people are blogging about it , and discussing it on various forums. All that is very welcome and certainly helps reassuring Michael andmyself that we might be onto something that’s worthwhile and keeps us motivated to continue.

At this stage of the game we feel that the core library is reasonably complete, we’re reasonably happy with the API and we’re venturing into experiments where we’d like to prove that the concepts and implementation are good and that the performance goals are achievable as well. Michael created a neat, very light-weight, yet rather complete IRC client over a weekend. My fascination with security protocols led me to attempt an implementation of SSH2 .

I chose SSH because I wanted to learn more about the protocol, and wanted to compare it with my previous experience implementing SSL (outside of the context of Xtreams). I also see it as a good target for validation of our performance goals. Secure protocols are naturally layered and that seems to be a rather good fit for an attempt to map that structure onto a stream stack with the socket connection at the bottom, various packet splitting/combining, encryption and hashing layers on top of it, all hopefully coming together into a very simple and transparent binary stream facade. If the abstractions and implementation is right, the stack must behave the same as a simple binary stream and it must not cost much in terms of performance.

So, I’ve been working on this in my spare time for about 2 months now. It was a bit more work than I expected, not because I’ve hit some particularly difficult obstacles, in fact I was making fairly steady progress throughout, I just didn’t really know what I was getting into. SSH is really quite a bit more than just a protocol. It’s a suite of protocols (some documented better than others) with an architectural framework that puts them together for a particular purpose: running a remote shell, executing remote commands, uploading/downloading files, etc. You can’t reasonably compare SSH to SSL as a whole, that would be comparing apples to oranges, or rather comparing apple to an apple pie. The part of SSH that is roughly comparable to SSL would be the bottom-level transport layer combined with the authentication layer running on top of it. That’s all nice and dandy and I was done with that part in a few weeks, but the problem is, you can’t really use it for anything practical. You could use it for a custom, smalltalk to smalltalk, secure communication channel, but you can’t interoperate with anything else out there.

What I wanted was to be able to upload/download a large file with smalltalk on either the client or server end and be able to measure how long it takes compared to a native C client, like OpenSSH’s scp command. So, to get there I needed to also implement the connection layer , which provides the multiplexed multi-channel capabilities allowing independent data-flows over a single, shared, secure connection. Then I needed to figure out how the scp command uses those facilities to transport files over it, which involved a rather sparsely and incompletely documented SCP protocol and a good deal of trial and error experiments with OpenSSH.

So I’m glad to report that I’m finally seeing the light at the end of the (encrypted) tunnel. At this point I can execute an expression on a Smalltalk server via the ssh command. There isn’t any generic TCP server support built in, so the first half of the example code is just to establish a single TCP connection with a client:

	| listener socket server |
	"This is just to set up a TCP socket connection, nothing to do with SSH2"
	listener := SocketAccessor family: SocketAccessor AF_INET type: SocketAccessor SOCK_STREAM.
	listener soReuseaddr: true.
	listener bindTo: (IPSocketAddress hostAddress: IPSocketAddress thisHost port: 2222).
	[ socket := listener listenFor: 1; accept ] ensure: [ listener close ].
	"Now we have a socket and can set up an SSH2 connection on it, here playing the server side"
	server := SSH2ServerConnection on: socket.
	"This is just to have all SSH messages echoed to transcript"
	server when: SSH2Announcement do: [ :m | Transcript cr; print: m ].
	[ 	"Server normally doesn't do much beyond accepting the client handshake and then waiting for a disconnect.
		Everything is initiated by the client side and handled by background threads handling any established channels."
		server accept; waitForDisconnect
	] ensure: [ server close. socket close ]

The client side interaction looks something like the following:

[mkobetic@latitude ~]$ ssh -p 2222 localhost 3 + 4

Not particularly impressive output so let me also add what this interaction logged into the Transcript (as requested in the example code). It describes the entire message exchange between the client and the server:

-> identification ['Xtreams_Initial_Development']
<- identification ['OpenSSH_5.5']
<- SERVICE_REQUEST ssh-userauth
-> SERVICE_ACCEPT ssh-userauth
<- USERAUTH_REQUEST martin@ssh-connection none
-> USERAUTH_FAILURE #('publickey')
<- USERAUTH_REQUEST martin@ssh-connection publickey ssh-dss 5c:d1:c7:c8:27:48:8c:1a:fe:83:1d:7b:3c:09:49:6d no sig
<- USERAUTH_REQUEST martin@ssh-connection publickey ssh-dss 5c:d1:c7:c8:27:48:8c:1a:fe:83:1d:7b:3c:09:49:6d with sig
<- CHANNEL_OPEN(0) session 2097152/32768
-> CHANNEL_OPEN_CONFIRMATION(0)(0) 2097152/32668
<- CHANNEL_REQUEST(0) !  env LANG -> en_US.utf8
<- CHANNEL_REQUEST(0) ?  exec 3 + 4
<- DISCONNECT 11 disconnected by user

And that is not all, it can also upload/download files or directories in either direction (server -> client, client -> server) or execute remote shell commands from smalltalk client on a remote OpenSSH server. Here’s an example of how to make a smalltalk client talk to an OpenSSH server. The example includes the code needed to read the default user keys from $HOME/.ssh directory and making the socket connection:

	| home user keys socket client keys config |
	"The bulk of this is loading up your personal keys from your $HOME/.ssh directory as they are needed to successfully authenticate with the server"
	home := '$(HOME)' asLogicalFileSpecification asFilename.
	user := home tail.
	keys := SSH2Keys new.
	((home / '.ssh' filesMatching: 'id_*') reject: [ :fn | '*.pub' match: fn ]) do: [ :fn || pub pri |
		pri := fn asFilename readStream.
		pri := ([ CertificateFileReader new readFrom: pri ] ensure: [ pri close ]) any asKey.
		pub := (fn, '.pub') asFilename reading encoding: #ascii.
		(pub ending: $ ) -= 0.
		pub := [ Xtreams.SSH2HostKey readFrom: pub encodingBase64 ssh2Marshaling ] ensure: [ pub close ].
		pub := keys publicKeyFrom: pub.
		keys addPublic: pub private: pri ].
	"Now we have the keys and can set up an SSH configuration to use them."
	config := SSH2Configuration new keys: keys.
	"Create a socket"
	socket := SocketAccessor newTCPclientToHost: 'localhost' port: 22.
	"Set up an SSH client connection on it.
	client := SSH2ClientConnection on: socket.
	client configuration: config.
	"This is just so that all SSH messages are echoed into the Transcript"
	client when: SSH2Announcement do: [ :m | Transcript cr; print: m ].
"	client when: SSH2TransportMessage, SSH2ChannelSetupMessage, CHANNEL_CLOSE do: [ :m | Transcript cr; print: m ].
"	[	"A client has to connect as particular user (using the preconfigured keys) and gets a channel service in response"
		service := client connect: user.
		"A channel service can provide an interactive session or a tunnel.
		You can ask for as many sessions, tunnels as you want, each will get its own channel multiplexed over the same SSH connection."
		session := service session.
		"Given a session you can execute a command, or upload/download a file or directory, etc..."
"		[	session exec: 'ls -l'.
		] ensure: [ session close ].
"		[	[ session scpUploadFrom: '' to: '/dev/shm/' ] timeToRun
		] ensure: [ session close ]
	] ensure: [ client close. socket close ]

I also started playing with a “shell” session with a smalltalk server, but rather than invoking or emulating bash, I wanted to run a simple read/eval/print loop in smalltalk instead. Having that, one could use the ssh command to connect to a smalltalk server securely and execute smalltalk expressions on it. It is basically working as is, except the smalltalk side has to do at least basic level of terminal emulation. A simple CR returned from the server moves the cursor in the terminal down one line but doesn’t move it back to the left. That one would be easy, but it also seems that the default terminal setup expects the server to echo what is typed into the terminal (I couldn’t see what I was typing in my experiments). So I’ll need yet another piece, basic terminal emulation layer to make this work reasonably.

Performance is looking good as well. My primary test is uploading/downloading a reasonably large file using scp. Here’s a transcript of a terminal session uploading a file to both an OpenSSH server and a smalltalk server:

[mkobetic@latitude 78]$ ll
-rw-rw-r-- 1 mkobetic mkobetic 65M Dec 15 16:14
[mkobetic@latitude 78]$ scp mkobetic@localhost:/dev/shm/                                                                               100%   64MB  32.1MB/s   00:02    
[mkobetic@latitude 78]$ scp -P2222 mkobetic@localhost:/dev/shm/                                                                               100%   64MB  21.4MB/s   00:03

And here is the same just transfering the file in the opposite direction, downloading it from the server:

[mkobetic@latitude 78]$ scp mkobetic@localhost:st/78/ /dev/shm                                                                               100%   64MB  32.1MB/s   00:02    
[mkobetic@latitude 78]$ scp -P2222 /dev/shm                                                                               100%   64MB  32.1MB/s   00:02

The commands with the -P2222 option are the ones running against smalltalk server (2222 was the port where it listened). The upload is somewhat slower (a different data stream setup is used when sending a file and when receiving one), but the download speed is on par. There are several critical aspects that you need to keep in mind when you want an efficient implementation.

1) You can’t come even close to the bulk encryption and hashing speed with a pure smalltalk implementation (at least not with any of the smalltalks that are currently available as far as I know). Just the overhead of indexed variable access in ByteArrays will kill you (last time I looked accessing an indexed instance variable in VisualWorks was about four times slower than accessing a named instance variable). Moreover the other side is most likely calling optimized (possibly pure assembler) implementations from libcrypto or some such. So don’t even try. That’s why we didn’t think twice about implementing the cryptographic streams in Xtreams by calling libcrypto (from OpenSSL) to do the heavy lifting. Arguably that’s cheating, but I don’t think it’s particularly different from calling other low level primitives in the VM. A symmetric cipher (e.g. AES, RC4,…) or a secure hash (SHA, MD5,..) is a specialized bit-twiddling algorithm. Implementing it in smalltalk is educational and fun, but they really aren’t practical in many contexts. There are optimized implementations of all of them available on any OS these days, so I think it’s only reasonable to take advantage of that. Moreover, many application contexts require cryptographic algorithm implementations to be certified (e.g. FIPS 140-2), other applications may require hardware accelerated implementations, so leaving it to external facilities is the most pragmatic choice.

2) Even if you do decide to “outsource” bulk encryption and hashing, you need to do it the right way. Calls outside of smalltalk are expensive, so you want to make them worth it. You cannot call out for every byte or two of data. You must send entire buffers to be processed. Xtreams employs 32K buffers by default. That seems to be sufficiently large to offset any costs of calling C (at least in VW).

3) You must avoid expensive garbage. However note the emphasis on expensive. You don’t need to skimp on every little object. The new space scavenging scheme can chew through megabytes of transient objects in no time. The expensive objects are the ones that make it to the old space but don’t survive too long after that. One particular type of objects that tends to fall into that category are the large ByteArrays used as buffers. It doesn’t take too many of those allocated in rapid sequence to overflow the new space, causing many of them tenuring into the old space. Since they are large they will quickly kick the incremental garbage collector into action. Suddenly you’re spending more time garbage collecting than doing the real work. So it’s critical to reuse buffer objects. If you can’t ensure that within your own code, Xtreams come with a built in RecyclingCenter, which serves as an overflow staging area for buffers, so that they can be picked up and reused, when the application is chewing through a lot of them.

And that’s it, that’s what I believe are the essential ingredients needed to make Xtreams able to measure up to plain C. And it seems that the results confirm that. So, where to go from here? I still have a few implementation issues listed in theXtreams-SSH2 package comment . I’d like to add the necessary bit of terminal emulation to make the ssh shell session with smalltalk server possible. I may add TCP tunneling support, just for completeness, we’ll see. I definitely want to experiment with different approaches for implementing the protocol state machine. I don’t like what I have in Xtreams-SSH2 now (and what’s in the SSL implementation either). I’m still searching for an approach that I’ll like and once I figure it out I’ll might do Xtreams-TLS as well.

Regarding the future of Xtreams-SSH2 package, I’m not sure how useful it can be in practice (assuming all is done and polished). Do you think you’d use it for scp upload/download directly to/from smalltalk? Would you use a secure login into a smalltalk server ? I don’t think there’s much point in building yet another general purpose SSH server/client, OpenSSH already does that job rather well. Where I think it might be interesting are smalltalk specific projects and applications. For example SSH has this notion of “subsystems” and you can define your own. The only one I know of currently is the sftp subsystem. But the sky is the limit in terms of coming up with new ones. Anyway, if you have ideas for useful applications of a native smalltalk SSH implementation, let me know.

I might write a few more posts on particular implementation details, either from the point of view of how to solve particular problem using Xtreams, or just as an educational bit about SSH in general. If there’s something about this project that interests you, let me know. I should add, that should you feel particularly bored and want to try this out, the package is available in Cincom Public Repository . It should work immediately in any sufficiently recent release of VisualWorks. The code should be fairly well portable to Squeak/Pharo, but it depends on Xtreams-Xtras that weren’t ported yet. I tried to contain the VisualWorks specific bits in the SSH2Keys class which encapsulates the use of RSA/DSA keys and algorithms and currently relies on the VisualWorks Security library. I hope to get around to retargeting it onto the EVP primitives in libcrypto, which would make it the same sort of deal as Xtreams-Xtras (possibly eventually merged into it as well).

– Posted by Martin Kobetic


Xtreams: Concatenating Streams

Did you ever run into a situation where you had a stream and some previously written chunk of code that could process the stream almost as is, if only the stream included few additional bytes in the beginning? Usually, I ended up just biting my lip and fetching the full content of the stream, prepending the missing bits and then setting up an internal stream on top of the collection. That’s assuming it was feasible to load the entire stream into memory. Wouldn’t it be lovely if I could simply prepend a stream in front of another stream and make the two look like one ? Let’s give it a try.

One of the things I think we did get right with Xtreams is that it’s xtremely easy to create full featured subclasses of ReadStream and WriteStream. In case of ReadStream the only required methods to implement are contentsSpecies andread:into:at: . That will give you a complete (non-positionable) stream. So let’s make a CompositeReadStream that adds following inst vars:

	source2  second source
	active  the currently active source

This stream should not be created with just on: , so let’s declare that #shouldNotImplement and add on:and: instead:

	on: aSource and: aSource2

		active := source := aSource.
		source2 := aSource2

With that in place it would be a mortal sin to not add ReadStream>>,

	, aReadStream
		"Return a read stream that combines self and @aReadStream into a single stream.
			((1 to: 5) reading, (6 to: 10) reading) rest
		^CompositeReadStream on: self and: aReadStream

The sample in the comment above shows how it’s intended to be used. Obviously we want the composite to produce the combined sequence of the elements from both sources. To get that we just need to implement read:into:at:

	read: anInteger into: aSequenceableCollection at: startIndex

		| count |
		count := 0.
		[	^active read: anInteger into: aSequenceableCollection at: startIndex
		] on: Incomplete do: [ :ex |
			count := ex count.
			active == source ifFalse: [ ex pass ].
			active := source2 ].
		"avoid making the recursive call in the handler"
		^[	self read: anInteger - count into: aSequenceableCollection at: startIndex + count
		] on: Incomplete do: [ :ex |
			(Incomplete on: aSequenceableCollection count: count + ex count at: startIndex) raise ]

The idea is simple, we start reading from source, when we run out of source we switch to source2.

To satisfy all implementation requirements we also need contentsSpecies , we’ll follow the species of the underlying source stream:

		^source contentsSpecies

And that’s it. We can do something very similar for WriteStreams, although note that it only makes sense to concatenate streams of limited growth, e.g.:

	| stream |
	stream := (String new writing limiting: 1), (String new writing limiting: 6).
	stream := (String new writing limiting: 5), stream.
	stream write: 'Hello World!'; close; terminal

yielding #(‘Hello’ (‘ ‘ ‘World!’)). If the first write stream grows without restrictions, then you’ll keep writing into that one and never into the second one.

Now this was too easy, let’s try something more xtreme. If we could make the composite stream add additional sub-streams on demand, we could use it for example to cut up arbitrary sentence into words. One way to achieve that is the having the second stream in the composite be something that can turn itself into another composite with itself in the second position again. As soon as the first stream fills up we need to trigger the transformation of this stream prototype in the second position into the same kind of composite as the one we started with. This kind of setup can accommodate arbitrarily long input on demand.

Let’s call this prototype stream a ProtoWriteStream . Obviously the prescription how to turn it into a real stream is a block. For transparency let’s trigger the transformation with any write related message send. Here’s the corresponding code for ProtoWriteStream as a direct subclass of WriteStream.

	write: anInteger from: aSequenceableCollection at: startIndex

		self become: destination value.
		^self write: anInteger from: aSequenceableCollection at: startIndex


		self become: destination value.
		^self contentsSpecies

We’re re-using the destination slot to hold the transformation block, that way the on: creation method can be reused as well. To make it easier to create stream prototypes, let’s add BlockClosure>> writingPrototype as well.


		^Xtreams.ProtoWriteStream on: self

Additionally we need to override close and flush to be noops and we can accomplish the stated task as follows.

	| prototype stream words |
	words := OrderedCollection new.
	prototype :=
		[	((words add: String new) writing ending: Character space)
			, prototype writingPrototype ].
	stream := prototype value.
	stream write: 'the quick brown fox jumps over the lazy dog'; close.

Similarly we can play the same game with read streams. Let’s try to re-compose the words into a single stream.

	| prototype stream  words |	
	words := ('the quick brown fox jumps over the lazy dog' tokensBasedOn: Character space) reading.
	prototype := [ words get reading, prototype readingPrototype ].
	stream := prototype value.
	stream rest

Obviously we could simply create all streams and concatenate them at once

	(('the quick brown fox jumps over the lazy dog' tokensBasedOn: Character space)
		inject: '' reading into: [ :all :word | all, word reading ]
	) rest

However the prototype based solution has the advantage of creating the sub-streams lazily, so if you don’t need to consume the whole input, you don’t waste the extra effort on the part that you’ll just throw away.

If you want to play with these concepts, the concatenation support is now part of Xtreams-Xtras. The CompositeReadStream is even positionable if all its components are positionable as well. But I’m less confident about that part and haven’t even implemented it for write streams yet. The proto streams are available in a new package Xtreams-Xperiments we’ve started. You’ll get it automatically if you load the whole XtreamsDevelopment bundle.

– Posted by Martin Kobetic